diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 3e0b41da7b5..6b44aacc229 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1773,6 +1773,8 @@ Release 0.22.0 - Unreleased MAPREDUCE-2571. CombineFileInputFormat.getSplits throws a java.lang.ArrayStoreException. (Bochun Bai via todd) + MAPREDUCE-2767. Remove Linux task-controller. (Milind Bhandarkar via shv) + Release 0.21.1 - Unreleased NEW FEATURES diff --git a/hadoop-mapreduce-project/build.xml b/hadoop-mapreduce-project/build.xml index a5cd206a6f0..fb19d062f1b 100644 --- a/hadoop-mapreduce-project/build.xml +++ b/hadoop-mapreduce-project/build.xml @@ -166,20 +166,6 @@ - - - - - - - - - @@ -710,8 +696,6 @@ - - - @@ -1277,7 +1260,6 @@ - @@ -1286,7 +1268,6 @@ - @@ -1769,7 +1750,6 @@ - @@ -1861,9 +1841,6 @@ - - - - - - - - - - @@ -2368,68 +2335,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/hadoop-mapreduce-project/src/c++/task-controller/.autom4te.cfg b/hadoop-mapreduce-project/src/c++/task-controller/.autom4te.cfg deleted file mode 100644 index d21d1c9877a..00000000000 --- a/hadoop-mapreduce-project/src/c++/task-controller/.autom4te.cfg +++ /dev/null @@ -1,42 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# -# autom4te configuration for hadoop utils library -# - -begin-language: "Autoheader-preselections" -args: --no-cache -end-language: "Autoheader-preselections" - -begin-language: "Automake-preselections" -args: --no-cache -end-language: "Automake-preselections" - -begin-language: "Autoreconf-preselections" -args: --no-cache -end-language: "Autoreconf-preselections" - -begin-language: "Autoconf-without-aclocal-m4" -args: --no-cache -end-language: "Autoconf-without-aclocal-m4" - -begin-language: "Autoconf" -args: --no-cache -end-language: "Autoconf" - diff --git a/hadoop-mapreduce-project/src/c++/task-controller/Makefile.am b/hadoop-mapreduce-project/src/c++/task-controller/Makefile.am deleted file mode 100644 index 43ee05b5a85..00000000000 --- a/hadoop-mapreduce-project/src/c++/task-controller/Makefile.am +++ /dev/null @@ -1,33 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -ACLOCAL_AMFLAGS = -I ../utils/m4 -AM_CFLAGS = -Wall - -bindir = $(exec_prefix) - -bin_PROGRAMS = task-controller -check_PROGRAMS = tests/test-task-controller -TESTS = $(check_PROGRAMS) - -task_controller_SOURCES = main.c task-controller.c configuration.c \ - task-controller.h - -tests_test_task_controller_SOURCES = tests/test-task-controller.c \ - task-controller.c configuration.c task-controller.h - -test: $(check_PROGRAMS) - @echo Done with $< diff --git a/hadoop-mapreduce-project/src/c++/task-controller/configuration.c b/hadoop-mapreduce-project/src/c++/task-controller/configuration.c deleted file mode 100644 index bbacbf7efcf..00000000000 --- a/hadoop-mapreduce-project/src/c++/task-controller/configuration.c +++ /dev/null @@ -1,245 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "configuration.h" - - -char * hadoop_conf_dir; - -struct configuration config={.size=0, .confdetails=NULL}; - -//clean up method for freeing configuration -void free_configurations() { - int i = 0; - for (i = 0; i < config.size; i++) { - if (config.confdetails[i]->key != NULL) { - free((void *)config.confdetails[i]->key); - } - if (config.confdetails[i]->value != NULL) { - free((void *)config.confdetails[i]->value); - } - free(config.confdetails[i]); - } - if (config.size > 0) { - free(config.confdetails); - } - config.size = 0; -} - -//function used to load the configurations present in the secure config -void get_configs() { - FILE *conf_file; - char *line; - char *equaltok; - char *temp_equaltok; - size_t linesize = 1000; - int size_read = 0; - int str_len = 0; - char *file_name = NULL; - -#ifndef HADOOP_CONF_DIR - str_len = strlen(CONF_FILE_PATTERN) + strlen(hadoop_conf_dir); - file_name = (char *) malloc(sizeof(char) * (str_len + 1)); -#else - str_len = strlen(CONF_FILE_PATTERN) + strlen(HADOOP_CONF_DIR); - file_name = (char *) malloc(sizeof(char) * (str_len + 1)); -#endif - - if (file_name == NULL) { - fprintf(LOGFILE, "Malloc failed :Out of memory \n"); - return; - } - memset(file_name,'\0',str_len +1); -#ifndef HADOOP_CONF_DIR - snprintf(file_name,str_len, CONF_FILE_PATTERN, hadoop_conf_dir); -#else - snprintf(file_name, str_len, CONF_FILE_PATTERN, HADOOP_CONF_DIR); -#endif - -#ifdef DEBUG - fprintf(LOGFILE, "get_configs :Conf file name is : %s \n", file_name); -#endif - - //allocate space for ten configuration items. - config.confdetails = (struct confentry **) malloc(sizeof(struct confentry *) - * MAX_SIZE); - config.size = 0; - conf_file = fopen(file_name, "r"); - if (conf_file == NULL) { - fprintf(LOGFILE, "Invalid conf file provided : %s \n", file_name); - free(file_name); - return; - } - while(!feof(conf_file)) { - line = (char *) malloc(linesize); - if(line == NULL) { - fprintf(LOGFILE, "malloc failed while reading configuration file.\n"); - goto cleanup; - } - size_read = getline(&line,&linesize,conf_file); - //feof returns true only after we read past EOF. - //so a file with no new line, at last can reach this place - //if size_read returns negative check for eof condition - if (size_read == -1) { - if(!feof(conf_file)){ - fprintf(LOGFILE, "getline returned error.\n"); - goto cleanup; - }else { - break; - } - } - //trim the ending new line - line[strlen(line)-1] = '\0'; - //comment line - if(line[0] == '#') { - free(line); - continue; - } - //tokenize first to get key and list of values. - //if no equals is found ignore this line, can be an empty line also - equaltok = strtok_r(line, "=", &temp_equaltok); - if(equaltok == NULL) { - free(line); - continue; - } - config.confdetails[config.size] = (struct confentry *) malloc( - sizeof(struct confentry)); - if(config.confdetails[config.size] == NULL) { - fprintf(LOGFILE, - "Failed allocating memory for single configuration item\n"); - goto cleanup; - } - -#ifdef DEBUG - fprintf(LOGFILE, "get_configs : Adding conf key : %s \n", equaltok); -#endif - - memset(config.confdetails[config.size], 0, sizeof(struct confentry)); - config.confdetails[config.size]->key = (char *) malloc( - sizeof(char) * (strlen(equaltok)+1)); - strcpy((char *)config.confdetails[config.size]->key, equaltok); - equaltok = strtok_r(NULL, "=", &temp_equaltok); - if (equaltok == NULL) { - fprintf(LOGFILE, "configuration tokenization failed \n"); - goto cleanup; - } - //means value is commented so don't store the key - if(equaltok[0] == '#') { - free(line); - free((void *)config.confdetails[config.size]->key); - free(config.confdetails[config.size]); - continue; - } - -#ifdef DEBUG - fprintf(LOGFILE, "get_configs : Adding conf value : %s \n", equaltok); -#endif - - config.confdetails[config.size]->value = (char *) malloc( - sizeof(char) * (strlen(equaltok)+1)); - strcpy((char *)config.confdetails[config.size]->value, equaltok); - if((config.size + 1) % MAX_SIZE == 0) { - config.confdetails = (struct confentry **) realloc(config.confdetails, - sizeof(struct confentry **) * (MAX_SIZE + config.size)); - if (config.confdetails == NULL) { - fprintf(LOGFILE, - "Failed re-allocating memory for configuration items\n"); - goto cleanup; - } - } - if(config.confdetails[config.size] ) - config.size++; - free(line); - } - - //close the file - fclose(conf_file); - //clean up allocated file name - free(file_name); - return; - //free spaces alloced. - cleanup: - if (line != NULL) { - free(line); - } - fclose(conf_file); - free(file_name); - free_configurations(); - return; -} - -/* - * function used to get a configuration value. - * The function for the first time populates the configuration details into - * array, next time onwards used the populated array. - * - */ -const char * get_value(const char* key) { - int count; - if (config.size == 0) { - get_configs(); - } - if (config.size == 0) { - fprintf(LOGFILE, "Invalid configuration provided\n"); - return NULL; - } - for (count = 0; count < config.size; count++) { - if (strcmp(config.confdetails[count]->key, key) == 0) { - return strdup(config.confdetails[count]->value); - } - } - return NULL; -} - -/** - * Function to return an array of values for a key. - * Value delimiter is assumed to be a comma. - */ -const char ** get_values(const char * key) { - const char ** toPass = NULL; - const char *value = get_value(key); - char *tempTok = NULL; - char *tempstr = NULL; - int size = 0; - int len; - //first allocate any array of 10 - if(value != NULL) { - toPass = (const char **) malloc(sizeof(char *) * MAX_SIZE); - tempTok = strtok_r((char *)value, ",", &tempstr); - if (tempTok != NULL) { - while (1) { - toPass[size++] = tempTok; - tempTok = strtok_r(NULL, ",", &tempstr); - if(tempTok == NULL){ - break; - } - if((size % MAX_SIZE) == 0) { - toPass = (const char **) realloc(toPass,(sizeof(char *) * - (MAX_SIZE * ((size/MAX_SIZE) +1)))); - } - } - } else { - toPass[size] = (char *)value; - } - } - if(size > 0) { - toPass[size] = NULL; - } - return toPass; -} - diff --git a/hadoop-mapreduce-project/src/c++/task-controller/configuration.h b/hadoop-mapreduce-project/src/c++/task-controller/configuration.h deleted file mode 100644 index d1ee6d63374..00000000000 --- a/hadoop-mapreduce-project/src/c++/task-controller/configuration.h +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include - -#define INCREMENT_SIZE 1000 -#define MAX_SIZE 10 - -struct confentry { - const char *key; - const char *value; -}; - - -struct configuration { - int size; - struct confentry **confdetails; -}; - -FILE *LOGFILE; - -#ifdef HADOOP_CONF_DIR - #define CONF_FILE_PATTERN "%s/taskcontroller.cfg" -#else - #define CONF_FILE_PATTERN "%s/conf/taskcontroller.cfg" -#endif - -extern struct configuration config; -//configuration file contents -#ifndef HADOOP_CONF_DIR - extern char *hadoop_conf_dir; -#endif -//method exposed to get the configurations -const char * get_value(const char* key); -//method to free allocated configuration -void free_configurations(); - -//function to return array of values pointing to the key. Values are -//comma seperated strings. -const char ** get_values(const char* key); diff --git a/hadoop-mapreduce-project/src/c++/task-controller/configure.ac b/hadoop-mapreduce-project/src/c++/task-controller/configure.ac deleted file mode 100644 index 86fff4f4108..00000000000 --- a/hadoop-mapreduce-project/src/c++/task-controller/configure.ac +++ /dev/null @@ -1,68 +0,0 @@ -# -*- Autoconf -*- -# Process this file with autoconf to produce a configure script. - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -AC_PREREQ(2.59) -AC_INIT([task-controller],[0.1]) - -#changing default prefix value to empty string, so that binary does not -#gets installed within system -AC_PREFIX_DEFAULT(.) - -#add new argument called -with-confdir -AC_ARG_WITH(confdir,[--with-confdir path to hadoop conf dir]) -AC_CONFIG_SRCDIR([task-controller.h]) -AC_CONFIG_AUX_DIR([config]) -AC_CONFIG_MACRO_DIR([../utils/m4]) -AM_INIT_AUTOMAKE([subdir-objects foreign no-dist]) - -# Checks for programs. -AC_PROG_CC - -# Checks for libraries. - -# Checks for header files. -AC_HEADER_STDC -AC_CHECK_HEADERS([stdlib.h string.h unistd.h fcntl.h]) - -#check for HADOOP_CONF_DIR - - -if test "$with_confdir" != "" -then -AC_DEFINE_UNQUOTED(HADOOP_CONF_DIR, ["$with_confdir"], [Location of Hadoop configuration]) -fi -# Checks for typedefs, structures, and compiler characteristics. -AC_C_CONST -AC_TYPE_PID_T -AC_TYPE_MODE_T -AC_TYPE_SIZE_T - -# Checks for library functions. -AC_FUNC_MALLOC -AC_FUNC_REALLOC -AC_FUNC_CHOWN -AC_CHECK_FUNCS([strerror memset mkdir rmdir strdup]) - -AC_CONFIG_FILES([Makefile]) -AC_OUTPUT - -AC_HEADER_STDBOOL -AC_PROG_MAKE_SET diff --git a/hadoop-mapreduce-project/src/c++/task-controller/main.c b/hadoop-mapreduce-project/src/c++/task-controller/main.c deleted file mode 100644 index 216417e3d9a..00000000000 --- a/hadoop-mapreduce-project/src/c++/task-controller/main.c +++ /dev/null @@ -1,260 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "task-controller.h" - -void open_log_file(const char *log_file) { - if (log_file == NULL) { - LOGFILE = stdout; - } else { - LOGFILE = fopen(log_file, "a"); - if (LOGFILE == NULL) { - fprintf(stdout, "Unable to open LOGFILE : %s \n", log_file); - LOGFILE = stdout; - } - if (LOGFILE != stdout) { - if (chmod(log_file, S_IREAD | S_IEXEC | S_IWRITE | S_IROTH | S_IWOTH - | S_IRGRP | S_IWGRP) < 0) { - fprintf(stdout, "Unable to change permission of the log file %s \n", - log_file); - fclose(LOGFILE); - fprintf(stdout, "changing log file to stdout"); - LOGFILE = stdout; - } - } - } -} - -void display_usage(FILE *stream) { - fprintf(stream, - "Usage: task-controller [-l logfile] user command command-args\n"); -} - -/** - * Check the permissions on taskcontroller to make sure that security is - * promisable. For this, we need task-controller binary to - * * be user-owned by root - * * be group-owned by a configured special group. - * * others do not have write or execute permissions - * * be setuid - */ -int check_taskcontroller_permissions(char *executable_file) { - - errno = 0; - char * resolved_path = (char *) canonicalize_file_name(executable_file); - if (resolved_path == NULL) { - fprintf(LOGFILE, - "Error resolving the canonical name for the executable : %s!", - strerror(errno)); - return -1; - } - - struct stat filestat; - errno = 0; - if (stat(resolved_path, &filestat) != 0) { - fprintf(LOGFILE, "Could not stat the executable : %s!.\n", strerror(errno)); - return -1; - } - - uid_t binary_euid = filestat.st_uid; // Binary's user owner - gid_t binary_egid = filestat.st_gid; // Binary's group owner - - // Effective uid should be root - if (binary_euid != 0) { - fprintf(LOGFILE, - "The task-controller binary should be user-owned by root.\n"); - return -1; - } - - // Get the group entry for the special_group - errno = 0; - struct group *special_group_entry = getgrgid(binary_egid); - if (special_group_entry == NULL) { - fprintf(LOGFILE, - "Unable to get information for effective group of the binary : %s\n", - strerror(errno)); - return -1; - } - - char * binary_group = special_group_entry->gr_name; - // verify that the group name of the special group - // is same as the one in configuration - if (check_variable_against_config(TT_GROUP_KEY, binary_group) != 0) { - fprintf(LOGFILE, - "Group of the binary does not match with that in configuration\n"); - return -1; - } - - // check others do not have write/execute permissions - if ((filestat.st_mode & S_IWOTH) == S_IWOTH || - (filestat.st_mode & S_IXOTH) == S_IXOTH) { - fprintf(LOGFILE, - "The task-controller binary should not have write or execute for others.\n"); - return -1; - } - - // Binary should be setuid executable - if ((filestat.st_mode & S_ISUID) != S_ISUID) { - fprintf(LOGFILE, - "The task-controller binary should be set setuid.\n"); - return -1; - } - - return 0; -} - -int main(int argc, char **argv) { - int command; - int next_option = 0; - const char * job_id = NULL; - const char * task_id = NULL; - const char * tt_root = NULL; - const char *log_dir = NULL; - const char * unique_string = NULL; - int exit_code = 0; - const char * task_pid = NULL; - const char* const short_options = "l:"; - const struct option long_options[] = { { "log", 1, NULL, 'l' }, { NULL, 0, - NULL, 0 } }; - - const char* log_file = NULL; - char * dir_to_be_deleted = NULL; - int conf_dir_len = 0; - - char *executable_file = argv[0]; -#ifndef HADOOP_CONF_DIR - conf_dir_len = (strlen(executable_file) - strlen(EXEC_PATTERN)) + 1; - if (conf_dir_len < 1) { - // We didn't get an absolute path to our executable_file; bail. - printf("Cannot find configuration directory.\n"); - printf("This program must be run with its full absolute path.\n"); - return INVALID_CONF_DIR; - } else { - hadoop_conf_dir = (char *) malloc (sizeof(char) * conf_dir_len); - strncpy(hadoop_conf_dir, executable_file, - (strlen(executable_file) - strlen(EXEC_PATTERN))); - hadoop_conf_dir[(strlen(executable_file) - strlen(EXEC_PATTERN))] = '\0'; - } -#endif - do { - next_option = getopt_long(argc, argv, short_options, long_options, NULL); - switch (next_option) { - case 'l': - log_file = optarg; - default: - break; - } - } while (next_option != -1); - - open_log_file(log_file); - - if (check_taskcontroller_permissions(executable_file) != 0) { - fprintf(LOGFILE, "Invalid permissions on task-controller binary.\n"); - return INVALID_TASKCONTROLLER_PERMISSIONS; - } - - //Minimum number of arguments required to run the task-controller - //command-name user command tt-root - if (argc < 3) { - display_usage(stdout); - return INVALID_ARGUMENT_NUMBER; - } - - //checks done for user name - //checks done if the user is root or not. - if (argv[optind] == NULL) { - fprintf(LOGFILE, "Invalid user name \n"); - return INVALID_USER_NAME; - } - if (get_user_details(argv[optind]) != 0) { - return INVALID_USER_NAME; - } - //implicit conversion to int instead of __gid_t and __uid_t - if (user_detail->pw_gid == 0 || user_detail->pw_uid == 0) { - fprintf(LOGFILE, "Cannot run tasks as super user\n"); - return SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS; - } - optind = optind + 1; - command = atoi(argv[optind++]); - - fprintf(LOGFILE, "main : command provided %d\n",command); - fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name); - - switch (command) { - case INITIALIZE_USER: - exit_code = initialize_user(user_detail->pw_name); - break; - case INITIALIZE_JOB: - job_id = argv[optind++]; - exit_code = initialize_job(job_id, user_detail->pw_name); - break; - case INITIALIZE_DISTRIBUTEDCACHE_FILE: - tt_root = argv[optind++]; - unique_string = argv[optind++]; - exit_code = initialize_distributed_cache_file(tt_root, unique_string, - user_detail->pw_name); - break; - case LAUNCH_TASK_JVM: - tt_root = argv[optind++]; - job_id = argv[optind++]; - task_id = argv[optind++]; - exit_code - = run_task_as_user(user_detail->pw_name, job_id, task_id, tt_root); - break; - case INITIALIZE_TASK: - job_id = argv[optind++]; - task_id = argv[optind++]; - exit_code = initialize_task(job_id, task_id, user_detail->pw_name); - break; - case TERMINATE_TASK_JVM: - task_pid = argv[optind++]; - exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGTERM); - break; - case KILL_TASK_JVM: - task_pid = argv[optind++]; - exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGKILL); - break; - case RUN_DEBUG_SCRIPT: - tt_root = argv[optind++]; - job_id = argv[optind++]; - task_id = argv[optind++]; - exit_code - = run_debug_script_as_user(user_detail->pw_name, job_id, task_id, tt_root); - break; - case SIGQUIT_TASK_JVM: - task_pid = argv[optind++]; - exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGQUIT); - break; - case ENABLE_TASK_FOR_CLEANUP: - tt_root = argv[optind++]; - job_id = argv[optind++]; - dir_to_be_deleted = argv[optind++]; - exit_code = enable_task_for_cleanup(tt_root, user_detail->pw_name, job_id, - dir_to_be_deleted); - break; - case ENABLE_JOB_FOR_CLEANUP: - tt_root = argv[optind++]; - job_id = argv[optind++]; - exit_code = enable_job_for_cleanup(tt_root, user_detail->pw_name, job_id); - break; - default: - exit_code = INVALID_COMMAND_PROVIDED; - } - fflush(LOGFILE); - fclose(LOGFILE); - return exit_code; -} diff --git a/hadoop-mapreduce-project/src/c++/task-controller/task-controller.c b/hadoop-mapreduce-project/src/c++/task-controller/task-controller.c deleted file mode 100644 index eb0cbaa6454..00000000000 --- a/hadoop-mapreduce-project/src/c++/task-controller/task-controller.c +++ /dev/null @@ -1,1300 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "task-controller.h" - -//struct to store the user details -struct passwd *user_detail = NULL; - -//LOGFILE -FILE *LOGFILE; - -//placeholder for global cleanup operations -void cleanup() { - free_configurations(); -} - -//change the user to passed user for executing/killing tasks -int change_user(const char * user) { - if (get_user_details(user) < 0) { - return -1; - } - - if(initgroups(user_detail->pw_name, user_detail->pw_gid) != 0) { - fprintf(LOGFILE, "unable to initgroups : %s\n", strerror(errno)); - cleanup(); - return SETUID_OPER_FAILED; - } - - errno = 0; - - setgid(user_detail->pw_gid); - if (errno != 0) { - fprintf(LOGFILE, "unable to setgid : %s\n", strerror(errno)); - cleanup(); - return SETUID_OPER_FAILED; - } - - setegid(user_detail->pw_gid); - if (errno != 0) { - fprintf(LOGFILE, "unable to setegid : %s\n", strerror(errno)); - cleanup(); - return SETUID_OPER_FAILED; - } - - setuid(user_detail->pw_uid); - if (errno != 0) { - fprintf(LOGFILE, "unable to setuid : %s\n", strerror(errno)); - cleanup(); - return SETUID_OPER_FAILED; - } - - seteuid(user_detail->pw_uid); - if (errno != 0) { - fprintf(LOGFILE, "unable to seteuid : %s\n", strerror(errno)); - cleanup(); - return SETUID_OPER_FAILED; - } - return 0; -} - -/** - * Checks the passed value for the variable config_key against the values in - * the configuration. - * Returns 0 if the passed value is found in the configuration, - * -1 otherwise - */ -int check_variable_against_config(const char *config_key, - const char *passed_value) { - - if (config_key == NULL || passed_value == NULL) { - return -1; - } - - int found = -1; - - const char **config_value = get_values(config_key); - - if (config_value == NULL) { - fprintf(LOGFILE, "%s is not configured.\n", config_key); - return -1; - } - - char *full_config_value = (char *)get_value(config_key); - - char **config_val_ptr = (char **) config_value; - while (*config_val_ptr != NULL) { - if (strcmp(*config_val_ptr, passed_value) == 0) { - found = 0; - break; - } - config_val_ptr++; - } - - if (found != 0) { - fprintf( - LOGFILE, - "Invalid value passed: \ - Configured value of %s is %s. \ - Passed value is %s.\n", - config_key, full_config_value, passed_value); - } - free(full_config_value); - free(config_value); - return found; -} - -/** - * Utility function to concatenate argB to argA using the concat_pattern - */ -char *concatenate(char *concat_pattern, char *return_path_name, int numArgs, - ...) { - va_list ap; - va_start(ap, numArgs); - int strlen_args = 0; - char *arg = NULL; - int j; - for (j = 0; j < numArgs; j++) { - arg = va_arg(ap, char*); - if (arg == NULL) { - fprintf(LOGFILE, "One of the arguments passed for %s in null.\n", - return_path_name); - return NULL; - } - strlen_args += strlen(arg); - } - va_end(ap); - - char *return_path = NULL; - int str_len = strlen(concat_pattern) + strlen_args; - - return_path = (char *) malloc(sizeof(char) * (str_len + 1)); - if (return_path == NULL) { - fprintf(LOGFILE, "Unable to allocate memory for %s.\n", return_path_name); - return NULL; - } - memset(return_path, '\0', str_len + 1); - va_start(ap, numArgs); - vsnprintf(return_path, str_len, concat_pattern, ap); - va_end(ap); - return return_path; -} - -/** - * Get the job-directory path from tt_root, user name and job-id - */ -char *get_job_directory(const char * tt_root, const char *user, - const char *jobid) { - return concatenate(TT_JOB_DIR_PATTERN, "job_dir_path", 3, tt_root, user, - jobid); -} - -/** - * Get the user directory of a particular user - */ -char *get_user_directory(const char *tt_root, const char *user) { - return concatenate(USER_DIR_PATTERN, "user_dir_path", 2, tt_root, user); -} - -/** - * Get the distributed cache directory for a particular user - */ -char *get_distributed_cache_directory(const char *tt_root, const char *user, - const char* unique_string) { - return concatenate(USER_DISTRIBUTED_CACHE_DIR_PATTERN, - "dist_cache_unique_path", 3, tt_root, user, unique_string); -} - -char *get_job_work_directory(const char *job_dir) { - return concatenate(JOB_DIR_TO_JOB_WORK_PATTERN, "job_work_dir_path", 2, - job_dir, ""); -} -/** - * Get the attempt directory for the given attempt_id - */ -char *get_attempt_directory(const char *job_dir, const char *attempt_id) { - return concatenate(JOB_DIR_TO_ATTEMPT_DIR_PATTERN, "attempt_dir_path", 2, - job_dir, attempt_id); -} - -/* - * Get the path to the task launcher file which is created by the TT - */ -char *get_task_launcher_file(const char *job_dir, const char *attempt_dir) { - return concatenate(TASK_SCRIPT_PATTERN, "task_script_path", 2, job_dir, - attempt_dir); -} - -/* - * Builds the full path of the dir(localTaskDir or localWorkDir) - * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller - * dir_to_be_deleted : is either taskDir($taskId) OR taskWorkDir($taskId/work) - */ -char *get_task_dir_path(const char *tt_root, const char *user, - const char *jobid, const char *dir_to_be_deleted) { - return concatenate(TT_LOCAL_TASK_DIR_PATTERN, "task_dir_full_path", 4, - tt_root, user, jobid, dir_to_be_deleted); -} - -/** - * Get the log directory for the given attempt. - */ -char *get_task_log_dir(const char *log_dir, const char *job_id, - const char *attempt_id) { - return concatenate(ATTEMPT_LOG_DIR_PATTERN, "task_log_dir", 3, log_dir, - job_id, attempt_id); -} - -/** - * Get the log directory for the given job. - */ -char *get_job_log_dir(const char *log_dir, const char *job_id) { - return concatenate(JOB_LOG_DIR_PATTERN, "job_log_dir", 2, log_dir, job_id); -} - -/** - * Get the job ACLs file for the given job log dir. - */ -char *get_job_acls_file(const char *log_dir) { - return concatenate(JOB_LOG_DIR_TO_JOB_ACLS_FILE_PATTERN, "job_acls_file", - 1, log_dir); -} - -/** - * Function to check if the passed tt_root is present in mapreduce.cluster.local.dir - * the task-controller is configured with. - */ -int check_tt_root(const char *tt_root) { - return check_variable_against_config(TT_SYS_DIR_KEY, tt_root); -} - -/** - * Function to check if the constructed path and absolute path of the task - * launcher file resolve to one and same. This is done so as to avoid - * security pitfalls because of relative path components in the file name. - */ -int check_path_for_relative_components(char *path) { - char * resolved_path = (char *) canonicalize_file_name(path); - if (resolved_path == NULL) { - fprintf(LOGFILE, - "Error resolving the path: %s. Passed path: %s\n", - strerror(errno), path); - return ERROR_RESOLVING_FILE_PATH; - } - if (strcmp(resolved_path, path) != 0) { - fprintf(LOGFILE, - "Relative path components in the path: %s. Resolved path: %s\n", - path, resolved_path); - free(resolved_path); - return RELATIVE_PATH_COMPONENTS_IN_FILE_PATH; - } - free(resolved_path); - return 0; -} - -/** - * Function to change the owner/group of a given path. - */ -static int change_owner(const char *path, uid_t uid, gid_t gid) { - int exit_code = chown(path, uid, gid); - if (exit_code != 0) { - fprintf(LOGFILE, "chown %d:%d for path %s failed: %s.\n", uid, gid, path, - strerror(errno)); - } - return exit_code; -} - -/** - * Function to change the mode of a given path. - */ -static int change_mode(const char *path, mode_t mode) { - int exit_code = chmod(path, mode); - if (exit_code != 0) { - fprintf(LOGFILE, "chmod %d of path %s failed: %s.\n", mode, path, - strerror(errno)); - } - return exit_code; -} - -/** - * Function to change permissions of the given path. It does the following - * recursively: - * 1) changes the owner/group of the paths to the passed owner/group - * 2) changes the file permission to the passed file_mode and directory - * permission to the passed dir_mode - * - * should_check_ownership : boolean to enable checking of ownership of each path - */ -static int secure_path(const char *path, uid_t uid, gid_t gid, - mode_t file_mode, mode_t dir_mode, int should_check_ownership) { - FTS *tree = NULL; // the file hierarchy - FTSENT *entry = NULL; // a file in the hierarchy - char *paths[] = { (char *) path, NULL };//array needs to be NULL-terminated - int process_path = 0; - int dir = 0; - int error_code = 0; - int done = 0; - - // Get physical locations and don't resolve the symlinks. - // Don't change directory while walking the directory. - int ftsoptions = FTS_PHYSICAL | FTS_NOCHDIR; - - tree = fts_open(paths, ftsoptions, NULL); - if (tree == NULL) { - fprintf(LOGFILE, - "Cannot open file traversal structure for the path %s:%s.\n", path, - strerror(errno)); - return -1; - } - - while (((entry = fts_read(tree)) != NULL) && !done) { - dir = 0; - switch (entry->fts_info) { - case FTS_D: - // A directory being visited in pre-order. - // We change ownership of directories in post-order. - // so ignore the pre-order visit. - process_path = 0; - break; - case FTS_DC: - // A directory that causes a cycle in the tree - // We don't expect cycles, ignore. - process_path = 0; - break; - case FTS_DNR: - // A directory which cannot be read - // Ignore and set error code. - process_path = 0; - error_code = -1; - break; - case FTS_DOT: - // "." or ".." - process_path = 0; - break; - case FTS_F: - // A regular file - process_path = 1; - break; - case FTS_DP: - // A directory being visited in post-order - if (entry->fts_level == 0) { - // root directory. Done with traversing. - done = 1; - } - process_path = 1; - dir = 1; - break; - case FTS_SL: - // A symbolic link - // We don't want to change-ownership(and set-permissions) for the file/dir - // pointed to by any symlink. - process_path = 0; - break; - case FTS_SLNONE: - // A symbolic link with a nonexistent target - process_path = 0; - break; - case FTS_NS: - // A file for which no stat(2) information was available - // Ignore and set error code - process_path = 0; - error_code = -1; - break; - case FTS_ERR: - // An error return. Ignore and set error code. - process_path = 0; - error_code = -1; - break; - case FTS_DEFAULT: - // File that doesn't belong to any of the above type. Ignore. - process_path = 0; - break; - default: - // None of the above. Ignore and set error code - process_path = 0; - error_code = -1; - } - - if (error_code != 0) { - break; - } - if (!process_path) { - continue; - } - error_code = secure_single_path(entry->fts_path, uid, gid, - (dir ? dir_mode : file_mode), should_check_ownership); - - } - if (fts_close(tree) != 0) { - fprintf(LOGFILE, "couldn't close file traversal structure:%s.\n", - strerror(errno)); - } - return error_code; -} - -/** - * Function to change ownership and permissions of the given path. - * This call sets ownership and permissions just for the path, not recursive. - */ -int secure_single_path(char *path, uid_t uid, gid_t gid, - mode_t perm, int should_check_ownership) { - int error_code = 0; - if (should_check_ownership && - (check_ownership(path, uid, gid) != 0)) { - fprintf(LOGFILE, - "Invalid file path. %s not user/group owned by the tasktracker.\n", path); - error_code = -1; - } else if (change_owner(path, uid, gid) != 0) { - fprintf(LOGFILE, "couldn't change the ownership of %s\n", path); - error_code = -3; - } else if (change_mode(path, perm) != 0) { - fprintf(LOGFILE, "couldn't change the permissions of %s\n", path); - error_code = -3; - } - return error_code; -} - -/** - * Function to prepare the attempt directories for the task JVM. - * This is done by changing the ownership of the attempt directory recursively - * to the job owner. We do the following: - * * sudo chown user:mapred -R taskTracker/$user/jobcache/$jobid/$attemptid/ - * * sudo chmod 2770 -R taskTracker/$user/jobcache/$jobid/$attemptid/ - */ -int prepare_attempt_directories(const char *job_id, const char *attempt_id, - const char *user) { - if (job_id == NULL || attempt_id == NULL || user == NULL) { - fprintf(LOGFILE, "Either attempt_id is null or the user passed is null.\n"); - return INVALID_ARGUMENT_NUMBER; - } - - gid_t tasktracker_gid = getegid(); // the group permissions of the binary. - - if (get_user_details(user) < 0) { - fprintf(LOGFILE, "Couldn't get the user details of %s.\n", user); - return INVALID_USER_NAME; - } - - char **local_dir = (char **) get_values(TT_SYS_DIR_KEY); - - if (local_dir == NULL) { - fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY); - cleanup(); - return PREPARE_ATTEMPT_DIRECTORIES_FAILED; - } - - char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY); -#ifdef DEBUG - fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY, - full_local_dir_str); -#endif - - char *job_dir; - char *attempt_dir; - char **local_dir_ptr = local_dir; - int failed = 0; - while (*local_dir_ptr != NULL) { - job_dir = get_job_directory(*local_dir_ptr, user, job_id); - if (job_dir == NULL) { - fprintf(LOGFILE, "Couldn't get job directory for %s.\n", job_id); - failed = 1; - break; - } - - // prepare attempt-dir in each of the mapreduce.cluster.local.dir - attempt_dir = get_attempt_directory(job_dir, attempt_id); - if (attempt_dir == NULL) { - fprintf(LOGFILE, "Couldn't get attempt directory for %s.\n", attempt_id); - failed = 1; - free(job_dir); - break; - } - - struct stat filestat; - if (stat(attempt_dir, &filestat) != 0) { - if (errno == ENOENT) { -#ifdef DEBUG - fprintf(LOGFILE, - "attempt_dir %s doesn't exist. Not doing anything.\n", attempt_dir); -#endif - } else { - // stat failed because of something else! - fprintf(LOGFILE, "Failed to stat the attempt_dir %s\n", attempt_dir); - failed = 1; - free(attempt_dir); - free(job_dir); - break; - } - } else if (secure_path(attempt_dir, user_detail->pw_uid, - tasktracker_gid, S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG, - 1) != 0) { - // No setgid on files and setgid on dirs, 770 - fprintf(LOGFILE, "Failed to secure the attempt_dir %s\n", attempt_dir); - failed = 1; - free(attempt_dir); - free(job_dir); - break; - } - - local_dir_ptr++; - free(attempt_dir); - free(job_dir); - } - free(local_dir); - free(full_local_dir_str); - - cleanup(); - if (failed) { - return PREPARE_ATTEMPT_DIRECTORIES_FAILED; - } - return 0; -} - -/** - * Function to prepare the job log dir(and job acls file in it) for the child. - * It gives the user ownership of the job's log-dir to the user and - * group ownership to the user running tasktracker(i.e. tt_user). - * - * * sudo chown user:mapred log-dir/userlogs/$jobid - * * if user is not $tt_user, - * * sudo chmod 2570 log-dir/userlogs/$jobid - * * else - * * sudo chmod 2770 log-dir/userlogs/$jobid - * * sudo chown user:mapred log-dir/userlogs/$jobid/job-acls.xml - * * if user is not $tt_user, - * * sudo chmod 2570 log-dir/userlogs/$jobid/job-acls.xml - * * else - * * sudo chmod 2770 log-dir/userlogs/$jobid/job-acls.xml - */ -int prepare_job_logs(const char *log_dir, const char *job_id, - mode_t permissions) { - - char *job_log_dir = get_job_log_dir(log_dir, job_id); - if (job_log_dir == NULL) { - fprintf(LOGFILE, "Couldn't get job log directory %s.\n", job_log_dir); - return -1; - } - - struct stat filestat; - if (stat(job_log_dir, &filestat) != 0) { - if (errno == ENOENT) { -#ifdef DEBUG - fprintf(LOGFILE, "job_log_dir %s doesn't exist. Not doing anything.\n", - job_log_dir); -#endif - free(job_log_dir); - return 0; - } else { - // stat failed because of something else! - fprintf(LOGFILE, "Failed to stat the job log dir %s\n", job_log_dir); - free(job_log_dir); - return -1; - } - } - - gid_t tasktracker_gid = getegid(); // the group permissions of the binary. - // job log directory should not be set permissions recursively - // because, on tt restart/reinit, it would contain directories of earlier run - if (secure_single_path(job_log_dir, user_detail->pw_uid, tasktracker_gid, - S_ISGID | permissions, 1) != 0) { - fprintf(LOGFILE, "Failed to secure the log_dir %s\n", job_log_dir); - free(job_log_dir); - return -1; - } - - //set ownership and permissions for job_log_dir/job-acls.xml, if exists. - char *job_acls_file = get_job_acls_file(job_log_dir); - if (job_acls_file == NULL) { - fprintf(LOGFILE, "Couldn't get job acls file %s.\n", job_acls_file); - free(job_log_dir); - return -1; - } - - struct stat filestat1; - if (stat(job_acls_file, &filestat1) != 0) { - if (errno == ENOENT) { -#ifdef DEBUG - fprintf(LOGFILE, "job_acls_file %s doesn't exist. Not doing anything.\n", - job_acls_file); -#endif - free(job_acls_file); - free(job_log_dir); - return 0; - } else { - // stat failed because of something else! - fprintf(LOGFILE, "Failed to stat the job_acls_file %s\n", job_acls_file); - free(job_acls_file); - free(job_log_dir); - return -1; - } - } - - if (secure_single_path(job_acls_file, user_detail->pw_uid, tasktracker_gid, - permissions, 1) != 0) { - fprintf(LOGFILE, "Failed to secure the job acls file %s\n", job_acls_file); - free(job_acls_file); - free(job_log_dir); - return -1; - } - free(job_acls_file); - free(job_log_dir); - return 0; -} - -/** - * Function to prepare the task logs for the child. It gives the user - * ownership of the attempt's log-dir to the user and group ownership to the - * user running tasktracker. - * * sudo chown user:mapred log-dir/userlogs/$jobid/$attemptid - * * sudo chmod -R 2770 log-dir/userlogs/$jobid/$attemptid - */ -int prepare_task_logs(const char *log_dir, const char *job_id, - const char *task_id) { - - char *task_log_dir = get_task_log_dir(log_dir, job_id, task_id); - if (task_log_dir == NULL) { - fprintf(LOGFILE, "Couldn't get task_log directory %s.\n", task_log_dir); - return -1; - } - - struct stat filestat; - if (stat(task_log_dir, &filestat) != 0) { - if (errno == ENOENT) { - // See TaskRunner.java to see that an absent log-dir doesn't fail the task. -#ifdef DEBUG - fprintf(LOGFILE, "task_log_dir %s doesn't exist. Not doing anything.\n", - task_log_dir); -#endif - free(task_log_dir); - return 0; - } else { - // stat failed because of something else! - fprintf(LOGFILE, "Failed to stat the task_log_dir %s\n", task_log_dir); - free(task_log_dir); - return -1; - } - } - - gid_t tasktracker_gid = getegid(); // the group permissions of the binary. - if (secure_path(task_log_dir, user_detail->pw_uid, tasktracker_gid, - S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG, 1) != 0) { - // setgid on dirs but not files, 770. As of now, there are no files though - fprintf(LOGFILE, "Failed to secure the log_dir %s\n", task_log_dir); - free(task_log_dir); - return -1; - } - free(task_log_dir); - return 0; -} - -//function used to populate and user_details structure. -int get_user_details(const char *user) { - if (user_detail == NULL) { - user_detail = getpwnam(user); - if (user_detail == NULL) { - fprintf(LOGFILE, "Invalid user\n"); - return -1; - } - } - return 0; -} - -/* - * Function to check if the TaskTracker actually owns the file. - * Or it has right ownership already. - */ -int check_ownership(char *path, uid_t uid, gid_t gid) { - struct stat filestat; - if (stat(path, &filestat) != 0) { - return UNABLE_TO_STAT_FILE; - } - // check user/group. User should be TaskTracker user, group can either be - // TaskTracker's primary group or the special group to which binary's - // permissions are set. - // Or it can be the user/group owned by uid and gid passed. - if ((getuid() != filestat.st_uid || (getgid() != filestat.st_gid && getegid() - != filestat.st_gid)) && - ((uid != filestat.st_uid) || (gid != filestat.st_gid))) { - return FILE_NOT_OWNED_BY_TASKTRACKER; - } - return 0; -} - -/** - * Function to initialize the user directories of a user. - * It does the following: - * * sudo chown user:mapred -R taskTracker/$user - * * if user is not $tt_user, - * * sudo chmod 2570 -R taskTracker/$user - * * else // user is tt_user - * * sudo chmod 2770 -R taskTracker/$user - * This is done once per every user on the TaskTracker. - */ -int initialize_user(const char *user) { - - if (user == NULL) { - fprintf(LOGFILE, "user passed is null.\n"); - return INVALID_ARGUMENT_NUMBER; - } - - if (get_user_details(user) < 0) { - fprintf(LOGFILE, "Couldn't get the user details of %s", user); - return INVALID_USER_NAME; - } - - gid_t tasktracker_gid = getegid(); // the group permissions of the binary. - - char **local_dir = (char **) get_values(TT_SYS_DIR_KEY); - if (local_dir == NULL) { - fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY); - cleanup(); - return INVALID_TT_ROOT; - } - - char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY); -#ifdef DEBUG - fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY, - full_local_dir_str); -#endif - - int is_tt_user = (user_detail->pw_uid == getuid()); - - // for tt_user, set 770 permissions; otherwise set 570 - mode_t permissions = is_tt_user ? (S_IRWXU | S_IRWXG) - : (S_IRUSR | S_IXUSR | S_IRWXG); - char *user_dir; - char **local_dir_ptr = local_dir; - int failed = 0; - while (*local_dir_ptr != NULL) { - user_dir = get_user_directory(*local_dir_ptr, user); - if (user_dir == NULL) { - fprintf(LOGFILE, "Couldn't get userdir directory for %s.\n", user); - failed = 1; - break; - } - - struct stat filestat; - if (stat(user_dir, &filestat) != 0) { - if (errno == ENOENT) { -#ifdef DEBUG - fprintf(LOGFILE, "user_dir %s doesn't exist. Not doing anything.\n", - user_dir); -#endif - } else { - // stat failed because of something else! - fprintf(LOGFILE, "Failed to stat the user_dir %s\n", - user_dir); - failed = 1; - free(user_dir); - break; - } - } else if (secure_path(user_dir, user_detail->pw_uid, - tasktracker_gid, permissions, S_ISGID | permissions, 1) != 0) { - // No setgid on files and setgid on dirs, - // 770 for tt_user and 570 for any other user - fprintf(LOGFILE, "Failed to secure the user_dir %s\n", - user_dir); - failed = 1; - free(user_dir); - break; - } - - local_dir_ptr++; - free(user_dir); - } - free(local_dir); - free(full_local_dir_str); - cleanup(); - if (failed) { - return INITIALIZE_USER_FAILED; - } - return 0; -} - -/** - * Function to prepare the job directories for the task JVM. - * We do the following: - * * sudo chown user:mapred -R taskTracker/$user/jobcache/$jobid - * * sudo chown user:mapred -R logs/userlogs/$jobid - * * if user is not $tt_user, - * * sudo chmod 2570 -R taskTracker/$user/jobcache/$jobid - * * sudo chmod 2570 -R logs/userlogs/$jobid - * * else // user is tt_user - * * sudo chmod 2770 -R taskTracker/$user/jobcache/$jobid - * * sudo chmod 2770 -R logs/userlogs/$jobid - * * - * * For any user, sudo chmod 2770 taskTracker/$user/jobcache/$jobid/work - */ -int initialize_job(const char *jobid, const char *user) { - if (jobid == NULL || user == NULL) { - fprintf(LOGFILE, "Either jobid is null or the user passed is null.\n"); - return INVALID_ARGUMENT_NUMBER; - } - - if (get_user_details(user) < 0) { - fprintf(LOGFILE, "Couldn't get the user details of %s", user); - return INVALID_USER_NAME; - } - - gid_t tasktracker_gid = getegid(); // the group permissions of the binary. - - char **local_dir = (char **) get_values(TT_SYS_DIR_KEY); - if (local_dir == NULL) { - fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY); - cleanup(); - return INVALID_TT_ROOT; - } - - char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY); -#ifdef DEBUG - fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY, - full_local_dir_str); -#endif - - int is_tt_user = (user_detail->pw_uid == getuid()); - - // for tt_user, set 770 permissions; for any other user, set 570 for job-dir - mode_t permissions = is_tt_user ? (S_IRWXU | S_IRWXG) - : (S_IRUSR | S_IXUSR | S_IRWXG); - char *job_dir, *job_work_dir; - char **local_dir_ptr = local_dir; - int failed = 0; - while (*local_dir_ptr != NULL) { - job_dir = get_job_directory(*local_dir_ptr, user, jobid); - if (job_dir == NULL) { - fprintf(LOGFILE, "Couldn't get job directory for %s.\n", jobid); - failed = 1; - break; - } - - struct stat filestat; - if (stat(job_dir, &filestat) != 0) { - if (errno == ENOENT) { -#ifdef DEBUG - fprintf(LOGFILE, "job_dir %s doesn't exist. Not doing anything.\n", - job_dir); -#endif - } else { - // stat failed because of something else! - fprintf(LOGFILE, "Failed to stat the job_dir %s\n", job_dir); - failed = 1; - free(job_dir); - break; - } - } else if (secure_path(job_dir, user_detail->pw_uid, tasktracker_gid, - permissions, S_ISGID | permissions, 1) != 0) { - // No setgid on files and setgid on dirs, - // 770 for tt_user and 570 for any other user - fprintf(LOGFILE, "Failed to secure the job_dir %s\n", job_dir); - failed = 1; - free(job_dir); - break; - } else if (!is_tt_user) { - // For tt_user, we don't need this as we already set 2770 for - // job-work-dir because of "chmod -R" done above - job_work_dir = get_job_work_directory(job_dir); - if (job_work_dir == NULL) { - fprintf(LOGFILE, "Couldn't get job-work directory for %s.\n", jobid); - failed = 1; - break; - } - - // Set 2770 on the job-work directory - if (stat(job_work_dir, &filestat) != 0) { - if (errno == ENOENT) { -#ifdef DEBUG - fprintf(LOGFILE, - "job_work_dir %s doesn't exist. Not doing anything.\n", - job_work_dir); -#endif - free(job_work_dir); - } else { - // stat failed because of something else! - fprintf(LOGFILE, "Failed to stat the job_work_dir %s\n", - job_work_dir); - failed = 1; - free(job_work_dir); - free(job_dir); - break; - } - } else if (change_mode(job_work_dir, S_ISGID | S_IRWXU | S_IRWXG) != 0) { - fprintf(LOGFILE, - "couldn't change the permissions of job_work_dir %s\n", - job_work_dir); - failed = 1; - free(job_work_dir); - free(job_dir); - break; - } - } - - local_dir_ptr++; - free(job_dir); - } - free(local_dir); - free(full_local_dir_str); - int exit_code = 0; - if (failed) { - exit_code = INITIALIZE_JOB_FAILED; - goto cleanup; - } - - char *log_dir = (char *) get_value(TT_LOG_DIR_KEY); - if (log_dir == NULL) { - fprintf(LOGFILE, "Log directory is not configured.\n"); - exit_code = INVALID_TT_LOG_DIR; - goto cleanup; - } - - if (prepare_job_logs(log_dir, jobid, permissions) != 0) { - fprintf(LOGFILE, "Couldn't prepare job logs directory %s for %s.\n", - log_dir, jobid); - exit_code = PREPARE_JOB_LOGS_FAILED; - } - - cleanup: - // free configurations - cleanup(); - if (log_dir != NULL) { - free(log_dir); - } - return exit_code; -} - -/** - * Function to initialize the distributed cache file for a user. - * It does the following: - * * sudo chown user:mapred -R taskTracker/$user/distcache/ - * * if user is not $tt_user, - * * sudo chmod 2570 -R taskTracker/$user/distcache/ - * * else // user is tt_user - * * sudo chmod 2770 -R taskTracker/$user/distcache/ - * This is done once per localization. Tasks reusing JVMs just create - * symbolic links themselves and so there isn't anything specific to do in - * that case. - */ -int initialize_distributed_cache_file(const char *tt_root, - const char *unique_string, const char *user) { - if (tt_root == NULL) { - fprintf(LOGFILE, "tt_root passed is null.\n"); - return INVALID_ARGUMENT_NUMBER; - } - if (unique_string == NULL) { - fprintf(LOGFILE, "unique_string passed is null.\n"); - return INVALID_ARGUMENT_NUMBER; - } - - if (user == NULL) { - fprintf(LOGFILE, "user passed is null.\n"); - return INVALID_ARGUMENT_NUMBER; - } - - if (get_user_details(user) < 0) { - fprintf(LOGFILE, "Couldn't get the user details of %s", user); - return INVALID_USER_NAME; - } - //Check tt_root - if (check_tt_root(tt_root) < 0) { - fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root); - cleanup(); - return INVALID_TT_ROOT; - } - - // set permission on the unique directory - char *localized_unique_dir = get_distributed_cache_directory(tt_root, user, - unique_string); - if (localized_unique_dir == NULL) { - fprintf(LOGFILE, "Couldn't get unique distcache directory for %s.\n", user); - cleanup(); - return INITIALIZE_DISTCACHEFILE_FAILED; - } - - gid_t binary_gid = getegid(); // the group permissions of the binary. - - int is_tt_user = (user_detail->pw_uid == getuid()); - - // for tt_user, set 770 permissions; for any other user, set 570 - mode_t permissions = is_tt_user ? (S_IRWXU | S_IRWXG) - : (S_IRUSR | S_IXUSR | S_IRWXG); - int failed = 0; - struct stat filestat; - if (stat(localized_unique_dir, &filestat) != 0) { - // stat on distcache failed because of something - fprintf(LOGFILE, "Failed to stat the localized_unique_dir %s\n", - localized_unique_dir); - failed = INITIALIZE_DISTCACHEFILE_FAILED; - } else if (secure_path(localized_unique_dir, user_detail->pw_uid, - binary_gid, permissions, S_ISGID | permissions, 1) != 0) { - // No setgid on files and setgid on dirs, - // 770 for tt_user and 570 for any other user - fprintf(LOGFILE, "Failed to secure the localized_unique_dir %s\n", - localized_unique_dir); - failed = INITIALIZE_DISTCACHEFILE_FAILED; - } - free(localized_unique_dir); - cleanup(); - return failed; -} - -/** - * Function used to initialize task. Prepares attempt_dir, jars_dir and - * log_dir to be accessible by the child - */ -int initialize_task(const char *jobid, const char *taskid, const char *user) { - int exit_code = 0; -#ifdef DEBUG - fprintf(LOGFILE, "job-id passed to initialize_task : %s.\n", jobid); - fprintf(LOGFILE, "task-d passed to initialize_task : %s.\n", taskid); -#endif - - if (prepare_attempt_directories(jobid, taskid, user) != 0) { - fprintf(LOGFILE, - "Couldn't prepare the attempt directories for %s of user %s.\n", - taskid, user); - exit_code = PREPARE_ATTEMPT_DIRECTORIES_FAILED; - goto cleanup; - } - - char *log_dir = (char *) get_value(TT_LOG_DIR_KEY); - if (log_dir == NULL) { - fprintf(LOGFILE, "Log directory is not configured.\n"); - exit_code = INVALID_TT_LOG_DIR; - goto cleanup; - } - - if (prepare_task_logs(log_dir, jobid, taskid) != 0) { - fprintf(LOGFILE, "Couldn't prepare task logs directory %s for %s.\n", - log_dir, taskid); - exit_code = PREPARE_TASK_LOGS_FAILED; - } - - cleanup: - // free configurations - cleanup(); - if (log_dir != NULL) { - free(log_dir); - } - return exit_code; -} - -/* - * Function used to launch a task as the provided user. - */ -int run_task_as_user(const char * user, const char *jobid, const char *taskid, - const char *tt_root) { - return run_process_as_user(user, jobid, taskid, tt_root, LAUNCH_TASK_JVM); -} - -/* - * Function that is used as a helper to launch task JVMs and debug scripts. - * Not meant for launching any other process. It does the following : - * 1) Checks if the tt_root passed is found in mapreduce.cluster.local.dir - * 2) Prepares attempt_dir and log_dir to be accessible by the task JVMs - * 3) Uses get_task_launcher_file to fetch the task script file path - * 4) Does an execlp on the same in order to replace the current image with - * task image. - */ -int run_process_as_user(const char * user, const char * jobid, -const char *taskid, const char *tt_root, int command) { - if (command != LAUNCH_TASK_JVM && command != RUN_DEBUG_SCRIPT) { - return INVALID_COMMAND_PROVIDED; - } - if (jobid == NULL || taskid == NULL || tt_root == NULL) { - return INVALID_ARGUMENT_NUMBER; - } - - if (command == LAUNCH_TASK_JVM) { - fprintf(LOGFILE, "run_process_as_user launching a JVM for task :%s.\n", taskid); - } else if (command == RUN_DEBUG_SCRIPT) { - fprintf(LOGFILE, "run_process_as_user launching a debug script for task :%s.\n", taskid); - } - -#ifdef DEBUG - fprintf(LOGFILE, "Job-id passed to run_process_as_user : %s.\n", jobid); - fprintf(LOGFILE, "task-d passed to run_process_as_user : %s.\n", taskid); - fprintf(LOGFILE, "tt_root passed to run_process_as_user : %s.\n", tt_root); -#endif - - //Check tt_root before switching the user, as reading configuration - //file requires privileged access. - if (check_tt_root(tt_root) < 0) { - fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root); - cleanup(); - return INVALID_TT_ROOT; - } - - int exit_code = 0; - char *job_dir = NULL, *task_script_path = NULL; - - if (command == LAUNCH_TASK_JVM && - (exit_code = initialize_task(jobid, taskid, user)) != 0) { - fprintf(LOGFILE, "Couldn't initialise the task %s of user %s.\n", taskid, - user); - goto cleanup; - } - - job_dir = get_job_directory(tt_root, user, jobid); - if (job_dir == NULL) { - fprintf(LOGFILE, "Couldn't obtain job_dir for %s in %s.\n", jobid, tt_root); - exit_code = OUT_OF_MEMORY; - goto cleanup; - } - - task_script_path = get_task_launcher_file(job_dir, taskid); - if (task_script_path == NULL) { - fprintf(LOGFILE, "Couldn't obtain task_script_path in %s.\n", job_dir); - exit_code = OUT_OF_MEMORY; - goto cleanup; - } - - errno = 0; - exit_code = check_path_for_relative_components(task_script_path); - if(exit_code != 0) { - goto cleanup; - } - - //change the user - fcloseall(); - free(job_dir); - umask(0007); - if (change_user(user) != 0) { - exit_code = SETUID_OPER_FAILED; - goto cleanup; - } - - errno = 0; - cleanup(); - execlp(task_script_path, task_script_path, NULL); - if (errno != 0) { - free(task_script_path); - if (command == LAUNCH_TASK_JVM) { - fprintf(LOGFILE, "Couldn't execute the task jvm file: %s", strerror(errno)); - exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT; - } else if (command == RUN_DEBUG_SCRIPT) { - fprintf(LOGFILE, "Couldn't execute the task debug script file: %s", strerror(errno)); - exit_code = UNABLE_TO_EXECUTE_DEBUG_SCRIPT; - } - } - - return exit_code; - -cleanup: - if (job_dir != NULL) { - free(job_dir); - } - if (task_script_path != NULL) { - free(task_script_path); - } - // free configurations - cleanup(); - return exit_code; -} -/* - * Function used to launch a debug script as the provided user. - */ -int run_debug_script_as_user(const char * user, const char *jobid, const char *taskid, - const char *tt_root) { - return run_process_as_user(user, jobid, taskid, tt_root, RUN_DEBUG_SCRIPT); -} -/** - * Function used to terminate/kill a task launched by the user, - * or dump the process' stack (by sending SIGQUIT). - * The function sends appropriate signal to the process group - * specified by the task_pid. - */ -int kill_user_task(const char *user, const char *task_pid, int sig) { - int pid = 0; - - if(task_pid == NULL) { - return INVALID_ARGUMENT_NUMBER; - } - -#ifdef DEBUG - fprintf(LOGFILE, "user passed to kill_user_task : %s.\n", user); - fprintf(LOGFILE, "task-pid passed to kill_user_task : %s.\n", task_pid); - fprintf(LOGFILE, "signal passed to kill_user_task : %d.\n", sig); -#endif - - pid = atoi(task_pid); - - if(pid <= 0) { - return INVALID_TASK_PID; - } - - fcloseall(); - if (change_user(user) != 0) { - cleanup(); - return SETUID_OPER_FAILED; - } - - //Don't continue if the process-group is not alive anymore. - if(kill(-pid,0) < 0) { - errno = 0; - cleanup(); - return 0; - } - - if (kill(-pid, sig) < 0) { - if(errno != ESRCH) { - fprintf(LOGFILE, "Error is %s\n", strerror(errno)); - cleanup(); - return UNABLE_TO_KILL_TASK; - } - errno = 0; - } - cleanup(); - return 0; -} - -/** - * Enables the path for deletion by changing the owner, group and permissions - * of the specified path and all the files/directories in the path recursively. - * * sudo chown user:mapred -R full_path - * * sudo chmod 2770 -R full_path - * Before changing permissions, makes sure that the given path doesn't contain - * any relative components. - * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller - * full_path : is either jobLocalDir, taskDir OR taskWorkDir that is to be - * deleted - */ -static int enable_path_for_cleanup(const char *tt_root, const char *user, - char *full_path) { - int exit_code = 0; - gid_t tasktracker_gid = getegid(); // the group permissions of the binary. - - if (check_tt_root(tt_root) < 0) { - fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root); - cleanup(); - return INVALID_TT_ROOT; - } - - if (full_path == NULL) { - fprintf(LOGFILE, - "Could not build the full path. Not deleting the dir %s\n", - full_path); - exit_code = UNABLE_TO_BUILD_PATH; // may be malloc failed - } - // Make sure that the path given is not having any relative components - else if ((exit_code = check_path_for_relative_components(full_path)) != 0) { - fprintf(LOGFILE, - "Not changing permissions. Path may contain relative components.\n", - full_path); - } - else if (get_user_details(user) < 0) { - fprintf(LOGFILE, "Couldn't get the user details of %s.\n", user); - exit_code = INVALID_USER_NAME; - } - else if (exit_code = secure_path(full_path, user_detail->pw_uid, - tasktracker_gid, - S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG, 0) != 0) { - // No setgid on files and setgid on dirs, 770. - // set 770 permissions for user, TTgroup for all files/directories in - // 'full_path' recursively sothat deletion of path by TaskTracker succeeds. - - fprintf(LOGFILE, "Failed to set permissions for %s\n", full_path); - } - - if (full_path != NULL) { - free(full_path); - } - // free configurations - cleanup(); - return exit_code; -} - -/** - * Enables the task work-dir/local-dir path for deletion. - * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller - * dir_to_be_deleted : is either taskDir OR taskWorkDir that is to be deleted - */ -int enable_task_for_cleanup(const char *tt_root, const char *user, - const char *jobid, const char *dir_to_be_deleted) { - char *full_path = get_task_dir_path(tt_root, user, jobid, dir_to_be_deleted); - return enable_path_for_cleanup(tt_root, user, full_path); -} - -/** - * Enables the jobLocalDir for deletion. - * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller - * user : owner of the job - * jobid : id of the job for which the cleanup is needed. - */ -int enable_job_for_cleanup(const char *tt_root, const char *user, - const char *jobid) { - char *full_path = get_job_directory(tt_root, user, jobid); - return enable_path_for_cleanup(tt_root, user, full_path); -} diff --git a/hadoop-mapreduce-project/src/c++/task-controller/task-controller.h b/hadoop-mapreduce-project/src/c++/task-controller/task-controller.h deleted file mode 100644 index 55d1221875e..00000000000 --- a/hadoop-mapreduce-project/src/c++/task-controller/task-controller.h +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "configuration.h" - -//command definitions -enum command { - INITIALIZE_USER, - INITIALIZE_JOB, - INITIALIZE_DISTRIBUTEDCACHE_FILE, - LAUNCH_TASK_JVM, - INITIALIZE_TASK, - TERMINATE_TASK_JVM, - KILL_TASK_JVM, - RUN_DEBUG_SCRIPT, - SIGQUIT_TASK_JVM, - ENABLE_TASK_FOR_CLEANUP, - ENABLE_JOB_FOR_CLEANUP -}; - -enum errorcodes { - INVALID_ARGUMENT_NUMBER = 1, - INVALID_USER_NAME, //2 - INVALID_COMMAND_PROVIDED, //3 - SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS, //4 - INVALID_TT_ROOT, //5 - SETUID_OPER_FAILED, //6 - UNABLE_TO_EXECUTE_TASK_SCRIPT, //7 - UNABLE_TO_KILL_TASK, //8 - INVALID_TASK_PID, //9 - ERROR_RESOLVING_FILE_PATH, //10 - RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //11 - UNABLE_TO_STAT_FILE, //12 - FILE_NOT_OWNED_BY_TASKTRACKER, //13 - PREPARE_ATTEMPT_DIRECTORIES_FAILED, //14 - INITIALIZE_JOB_FAILED, //15 - PREPARE_TASK_LOGS_FAILED, //16 - INVALID_TT_LOG_DIR, //17 - OUT_OF_MEMORY, //18 - INITIALIZE_DISTCACHEFILE_FAILED, //19 - INITIALIZE_USER_FAILED, //20 - UNABLE_TO_EXECUTE_DEBUG_SCRIPT, //21 - INVALID_CONF_DIR, //22 - UNABLE_TO_BUILD_PATH, //23 - INVALID_TASKCONTROLLER_PERMISSIONS, //24 - PREPARE_JOB_LOGS_FAILED, //25 -}; - -#define USER_DIR_PATTERN "%s/taskTracker/%s" - -#define TT_JOB_DIR_PATTERN USER_DIR_PATTERN"/jobcache/%s" - -#define USER_DISTRIBUTED_CACHE_DIR_PATTERN USER_DIR_PATTERN"/distcache/%s" - -#define JOB_DIR_TO_JOB_WORK_PATTERN "%s/work" - -#define JOB_DIR_TO_ATTEMPT_DIR_PATTERN "%s/%s" - -#define JOB_LOG_DIR_PATTERN "%s/userlogs/%s" - -#define JOB_LOG_DIR_TO_JOB_ACLS_FILE_PATTERN "%s/job-acls.xml" - -#define ATTEMPT_LOG_DIR_PATTERN JOB_LOG_DIR_PATTERN"/%s" - -#define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh" - -#define TT_LOCAL_TASK_DIR_PATTERN "%s/taskTracker/%s/jobcache/%s/%s" - -#define TT_SYS_DIR_KEY "mapreduce.cluster.local.dir" - -#define TT_LOG_DIR_KEY "hadoop.log.dir" - -#define TT_GROUP_KEY "mapreduce.tasktracker.group" - -#ifndef HADOOP_CONF_DIR - #define EXEC_PATTERN "/bin/task-controller" - extern char * hadoop_conf_dir; -#endif - -extern struct passwd *user_detail; - -extern FILE *LOGFILE; - -int run_task_as_user(const char * user, const char *jobid, const char *taskid, - const char *tt_root); - -int run_debug_script_as_user(const char * user, const char *jobid, const char *taskid, - const char *tt_root); - -int initialize_user(const char *user); - -int initialize_task(const char *jobid, const char *taskid, const char *user); - -int initialize_job(const char *jobid, const char *user); - -int initialize_distributed_cache_file(const char *tt_root, - const char* unique_string, const char *user); - -int kill_user_task(const char *user, const char *task_pid, int sig); - -int enable_task_for_cleanup(const char *tt_root, const char *user, - const char *jobid, const char *dir_to_be_deleted); - -int enable_job_for_cleanup(const char *tt_root, const char *user, - const char *jobid); - -int prepare_attempt_directory(const char *attempt_dir, const char *user); - -// The following functions are exposed for testing - -int check_variable_against_config(const char *config_key, - const char *passed_value); - -int get_user_details(const char *user); - -char *get_task_launcher_file(const char *job_dir, const char *attempt_dir); diff --git a/hadoop-mapreduce-project/src/c++/task-controller/tests/test-task-controller.c b/hadoop-mapreduce-project/src/c++/task-controller/tests/test-task-controller.c deleted file mode 100644 index d6c2531db52..00000000000 --- a/hadoop-mapreduce-project/src/c++/task-controller/tests/test-task-controller.c +++ /dev/null @@ -1,243 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "../task-controller.h" - -#define HADOOP_CONF_DIR "/tmp" - -int write_config_file(char *file_name) { - FILE *file; - char const *str = - "mapreduce.cluster.local.dir=/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4\n"; - - file = fopen(file_name, "w"); - if (file == NULL) { - printf("Failed to open %s.\n", file_name); - return EXIT_FAILURE; - } - fwrite(str, 1, strlen(str), file); - fclose(file); - return 0; -} - -void test_check_variable_against_config() { - - // A temporary configuration directory - char *conf_dir_templ = "/tmp/test-task-controller-conf-dir-XXXXXX"; - - // To accomodate "/conf/taskcontroller.cfg" - char template[strlen(conf_dir_templ) + strlen("/conf/taskcontroller.cfg")]; - - strcpy(template, conf_dir_templ); - char *temp_dir = mkdtemp(template); - if (temp_dir == NULL) { - printf("Couldn't create a temporary dir for conf.\n"); - goto cleanup; - } - - // Set the configuration directory - hadoop_conf_dir = strdup(temp_dir); - - // create the configuration directory - strcat(template, "/conf"); - char *conf_dir = strdup(template); - mkdir(conf_dir, S_IRWXU); - - // create the configuration file - strcat(template, "/taskcontroller.cfg"); - if (write_config_file(template) != 0) { - printf("Couldn't write the configuration file.\n"); - goto cleanup; - } - - // Test obtaining a value for a key from the config - char *config_values[4] = { "/tmp/testing1", "/tmp/testing2", - "/tmp/testing3", "/tmp/testing4" }; - char *value = (char *) get_value("mapreduce.cluster.local.dir"); - if (strcmp(value, "/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4") - != 0) { - printf("Obtaining a value for a key from the config failed.\n"); - goto cleanup; - } - - // Test the parsing of a multiple valued key from the config - char **values = (char **)get_values("mapreduce.cluster.local.dir"); - char **values_ptr = values; - int i = 0; - while (*values_ptr != NULL) { - printf(" value : %s\n", *values_ptr); - if (strcmp(*values_ptr, config_values[i++]) != 0) { - printf("Configured values are not read out properly. Test failed!"); - goto cleanup;; - } - values_ptr++; - } - - if (check_variable_against_config("mapreduce.cluster.local.dir", "/tmp/testing5") == 0) { - printf("Configuration should not contain /tmp/testing5! \n"); - goto cleanup; - } - - if (check_variable_against_config("mapreduce.cluster.local.dir", "/tmp/testing4") != 0) { - printf("Configuration should contain /tmp/testing4! \n"); - goto cleanup; - } - - cleanup: if (value != NULL) { - free(value); - } - if (values != NULL) { - free(values); - } - if (hadoop_conf_dir != NULL) { - free(hadoop_conf_dir); - } - unlink(template); - rmdir(conf_dir); - rmdir(hadoop_conf_dir); -} - -void test_get_user_directory() { - char *user_dir = (char *) get_user_directory("/tmp", "user"); - printf("user_dir obtained is %s\n", user_dir); - int ret = 0; - if (strcmp(user_dir, "/tmp/taskTracker/user") != 0) { - ret = -1; - } - free(user_dir); - assert(ret == 0); -} - -void test_get_job_directory() { - char *job_dir = (char *) get_job_directory("/tmp", "user", - "job_200906101234_0001"); - printf("job_dir obtained is %s\n", job_dir); - int ret = 0; - if (strcmp(job_dir, "/tmp/taskTracker/user/jobcache/job_200906101234_0001") - != 0) { - ret = -1; - } - free(job_dir); - assert(ret == 0); -} - -void test_get_attempt_directory() { - char *job_dir = (char *) get_job_directory("/tmp", "user", - "job_200906101234_0001"); - printf("job_dir obtained is %s\n", job_dir); - char *attempt_dir = (char *) get_attempt_directory(job_dir, - "attempt_200906101234_0001_m_000000_0"); - printf("attempt_dir obtained is %s\n", attempt_dir); - int ret = 0; - if (strcmp( - attempt_dir, - "/tmp/taskTracker/user/jobcache/job_200906101234_0001/attempt_200906101234_0001_m_000000_0") - != 0) { - ret = -1; - } - free(job_dir); - free(attempt_dir); - assert(ret == 0); -} - -void test_get_task_launcher_file() { - char *job_dir = (char *) get_job_directory("/tmp", "user", - "job_200906101234_0001"); - char *task_file = (char *) get_task_launcher_file(job_dir, - "attempt_200906112028_0001_m_000000_0"); - printf("task_file obtained is %s\n", task_file); - int ret = 0; - if (strcmp( - task_file, - "/tmp/taskTracker/user/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0/taskjvm.sh") - != 0) { - ret = -1; - } - free(task_file); - assert(ret == 0); -} - -void test_get_job_log_dir() { - char *logdir = (char *) get_job_log_dir("/tmp/testing", - "job_200906101234_0001"); - printf("logdir obtained is %s\n", logdir); - int ret = 0; - if (strcmp(logdir, "/tmp/testing/userlogs/job_200906101234_0001") != 0) { - ret = -1; - } - free(logdir); - assert(ret == 0); -} - -void test_get_job_acls_file() { - char *job_acls_file = (char *) get_job_acls_file( - "/tmp/testing/userlogs/job_200906101234_0001"); - printf("job acls file obtained is %s\n", job_acls_file); - int ret = 0; - if (strcmp(job_acls_file, - "/tmp/testing/userlogs/job_200906101234_0001/job-acls.xml") != 0) { - ret = -1; - } - free(job_acls_file); - assert(ret == 0); -} - -void test_get_task_log_dir() { - char *logdir = (char *) get_task_log_dir("/tmp/testing", - "job_200906101234_0001", "attempt_200906112028_0001_m_000000_0"); - printf("logdir obtained is %s\n", logdir); - int ret = 0; - if (strcmp(logdir, - "/tmp/testing/userlogs/job_200906101234_0001/attempt_200906112028_0001_m_000000_0") - != 0) { - ret = -1; - } - free(logdir); - assert(ret == 0); -} - -int main(int argc, char **argv) { - printf("\nStarting tests\n"); - LOGFILE = stdout; - - printf("\nTesting check_variable_against_config()\n"); - test_check_variable_against_config(); - - printf("\nTesting get_user_directory()\n"); - test_get_user_directory(); - - printf("\nTesting get_job_directory()\n"); - test_get_job_directory(); - - printf("\nTesting get_attempt_directory()\n"); - test_get_attempt_directory(); - - printf("\nTesting get_task_launcher_file()\n"); - test_get_task_launcher_file(); - - printf("\nTesting get_job_log_dir()\n"); - test_get_job_log_dir(); - - printf("\nTesting get_job_acls_file()\n"); - test_get_job_acls_file(); - - printf("\nTesting get_task_log_dir()\n"); - test_get_task_log_dir(); - - printf("\nFinished tests\n"); - return 0; -} diff --git a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LinuxTaskController.java b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LinuxTaskController.java deleted file mode 100644 index b3e63ae6e62..00000000000 --- a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LinuxTaskController.java +++ /dev/null @@ -1,657 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.mapred; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext; -import org.apache.hadoop.mapred.JvmManager.JvmEnv; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.Shell.ExitCodeException; -import org.apache.hadoop.util.Shell.ShellCommandExecutor; - -/** - * A {@link TaskController} that runs the task JVMs as the user - * who submits the job. - * - * This class executes a setuid executable to implement methods - * of the {@link TaskController}, including launching the task - * JVM and killing it when needed, and also initializing and - * finalizing the task environment. - *

The setuid executable is launched using the command line:

- *

task-controller mapreduce.job.user.name command command-args, where

- *

mapreduce.job.user.name is the name of the owner who submits the job

- *

command is one of the cardinal value of the - * {@link LinuxTaskController.TaskControllerCommands} enumeration

- *

command-args depends on the command being launched.

- * - * In addition to running and killing tasks, the class also - * sets up appropriate access for the directories and files - * that will be used by the tasks. - */ -class LinuxTaskController extends TaskController { - - private static final Log LOG = - LogFactory.getLog(LinuxTaskController.class); - - // Name of the executable script that will contain the child - // JVM command line. See writeCommand for details. - private static final String COMMAND_FILE = "taskjvm.sh"; - - // Path to the setuid executable. - private static String taskControllerExe; - - static { - // the task-controller is expected to be under the $HADOOP_PREFIX/bin - // directory. - File hadoopBin = new File(System.getenv("HADOOP_PREFIX"), "bin"); - taskControllerExe = - new File(hadoopBin, "task-controller").getAbsolutePath(); - } - - public LinuxTaskController() { - super(); - } - - /** - * List of commands that the setuid script will execute. - */ - enum TaskControllerCommands { - INITIALIZE_USER, - INITIALIZE_JOB, - INITIALIZE_DISTRIBUTEDCACHE_FILE, - LAUNCH_TASK_JVM, - INITIALIZE_TASK, - TERMINATE_TASK_JVM, - KILL_TASK_JVM, - RUN_DEBUG_SCRIPT, - SIGQUIT_TASK_JVM, - ENABLE_TASK_FOR_CLEANUP, - ENABLE_JOB_FOR_CLEANUP - } - - @Override - public void setup() throws IOException { - super.setup(); - - // Check the permissions of the task-controller binary by running it plainly. - // If permissions are correct, it returns an error code 1, else it returns - // 24 or something else if some other bugs are also present. - String[] taskControllerCmd = - new String[] { getTaskControllerExecutablePath() }; - ShellCommandExecutor shExec = new ShellCommandExecutor(taskControllerCmd); - try { - shExec.execute(); - } catch (ExitCodeException e) { - int exitCode = shExec.getExitCode(); - if (exitCode != 1) { - LOG.warn("Exit code from checking binary permissions is : " + exitCode); - logOutput(shExec.getOutput()); - throw new IOException("Task controller setup failed because of invalid" - + "permissions/ownership with exit code " + exitCode, e); - } - } - } - - /** - * Launch a task JVM that will run as the owner of the job. - * - * This method launches a task JVM by executing a setuid executable that will - * switch to the user and run the task. Also does initialization of the first - * task in the same setuid process launch. - */ - @Override - void launchTaskJVM(TaskController.TaskControllerContext context) - throws IOException { - JvmEnv env = context.env; - // get the JVM command line. - String cmdLine = - TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr, - env.logSize, true); - - StringBuffer sb = new StringBuffer(); - //export out all the environment variable before child command as - //the setuid/setgid binaries would not be getting, any environmental - //variables which begin with LD_*. - for(Entry entry : env.env.entrySet()) { - sb.append("export "); - sb.append(entry.getKey()); - sb.append("="); - sb.append(entry.getValue()); - sb.append("\n"); - } - sb.append(cmdLine); - // write the command to a file in the - // task specific cache directory - writeCommand(sb.toString(), getTaskCacheDirectory(context, - context.env.workDir)); - - // Call the taskcontroller with the right parameters. - List launchTaskJVMArgs = buildLaunchTaskArgs(context, - context.env.workDir); - ShellCommandExecutor shExec = buildTaskControllerExecutor( - TaskControllerCommands.LAUNCH_TASK_JVM, - env.conf.getUser(), - launchTaskJVMArgs, env.workDir, env.env); - context.shExec = shExec; - try { - shExec.execute(); - } catch (Exception e) { - int exitCode = shExec.getExitCode(); - LOG.warn("Exit code from task is : " + exitCode); - // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the task was - // terminated/killed forcefully. In all other cases, log the - // task-controller output - if (exitCode != 143 && exitCode != 137) { - LOG.warn("Exception thrown while launching task JVM : " - + StringUtils.stringifyException(e)); - LOG.info("Output from LinuxTaskController's launchTaskJVM follows:"); - logOutput(shExec.getOutput()); - } - throw new IOException(e); - } - if (LOG.isDebugEnabled()) { - LOG.info("Output from LinuxTaskController's launchTaskJVM follows:"); - logOutput(shExec.getOutput()); - } - } - - /** - * Launch the debug script process that will run as the owner of the job. - * - * This method launches the task debug script process by executing a setuid - * executable that will switch to the user and run the task. - */ - @Override - void runDebugScript(DebugScriptContext context) throws IOException { - String debugOut = FileUtil.makeShellPath(context.stdout); - String cmdLine = TaskLog.buildDebugScriptCommandLine(context.args, debugOut); - writeCommand(cmdLine, getTaskCacheDirectory(context, context.workDir)); - // Call the taskcontroller with the right parameters. - List launchTaskJVMArgs = buildLaunchTaskArgs(context, context.workDir); - runCommand(TaskControllerCommands.RUN_DEBUG_SCRIPT, context.task.getUser(), - launchTaskJVMArgs, context.workDir, null); - } - /** - * Helper method that runs a LinuxTaskController command - * - * @param taskControllerCommand - * @param user - * @param cmdArgs - * @param env - * @throws IOException - */ - private void runCommand(TaskControllerCommands taskControllerCommand, - String user, List cmdArgs, File workDir, Map env) - throws IOException { - - ShellCommandExecutor shExec = - buildTaskControllerExecutor(taskControllerCommand, user, cmdArgs, - workDir, env); - try { - shExec.execute(); - } catch (Exception e) { - LOG.warn("Exit code from " + taskControllerCommand.toString() + " is : " - + shExec.getExitCode()); - LOG.warn("Exception thrown by " + taskControllerCommand.toString() + " : " - + StringUtils.stringifyException(e)); - LOG.info("Output from LinuxTaskController's " - + taskControllerCommand.toString() + " follows:"); - logOutput(shExec.getOutput()); - throw new IOException(e); - } - if (LOG.isDebugEnabled()) { - LOG.info("Output from LinuxTaskController's " - + taskControllerCommand.toString() + " follows:"); - logOutput(shExec.getOutput()); - } - } - - /** - * Returns list of arguments to be passed while initializing a new task. See - * {@code buildTaskControllerExecutor(TaskControllerCommands, String, - * List, JvmEnv)} documentation. - * - * @param context - * @return Argument to be used while launching Task VM - */ - private List buildInitializeTaskArgs(TaskExecContext context) { - List commandArgs = new ArrayList(3); - String taskId = context.task.getTaskID().toString(); - String jobId = getJobId(context); - commandArgs.add(jobId); - if (!context.task.isTaskCleanupTask()) { - commandArgs.add(taskId); - } else { - commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX); - } - return commandArgs; - } - - @Override - void initializeTask(TaskControllerContext context) - throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Going to do " - + TaskControllerCommands.INITIALIZE_TASK.toString() - + " for " + context.task.getTaskID().toString()); - } - runCommand(TaskControllerCommands.INITIALIZE_TASK, - context.env.conf.getUser(), - buildInitializeTaskArgs(context), context.env.workDir, context.env.env); - } - - /** - * Builds the args to be passed to task-controller for enabling of task for - * cleanup. Last arg in this List is either $attemptId or $attemptId/work - */ - private List buildTaskCleanupArgs( - TaskControllerTaskPathDeletionContext context) { - List commandArgs = new ArrayList(3); - commandArgs.add(context.mapredLocalDir.toUri().getPath()); - commandArgs.add(context.task.getJobID().toString()); - - String workDir = ""; - if (context.isWorkDir) { - workDir = "/work"; - } - if (context.task.isTaskCleanupTask()) { - commandArgs.add(context.task.getTaskID() + TaskTracker.TASK_CLEANUP_SUFFIX - + workDir); - } else { - commandArgs.add(context.task.getTaskID() + workDir); - } - - return commandArgs; - } - - /** - * Builds the args to be passed to task-controller for enabling of job for - * cleanup. Last arg in this List is $jobid. - */ - private List buildJobCleanupArgs( - TaskControllerJobPathDeletionContext context) { - List commandArgs = new ArrayList(2); - commandArgs.add(context.mapredLocalDir.toUri().getPath()); - commandArgs.add(context.jobId.toString()); - - return commandArgs; - } - - /** - * Enables the task for cleanup by changing permissions of the specified path - * in the local filesystem - */ - @Override - void enableTaskForCleanup(PathDeletionContext context) - throws IOException { - if (context instanceof TaskControllerTaskPathDeletionContext) { - TaskControllerTaskPathDeletionContext tContext = - (TaskControllerTaskPathDeletionContext) context; - enablePathForCleanup(tContext, - TaskControllerCommands.ENABLE_TASK_FOR_CLEANUP, - buildTaskCleanupArgs(tContext)); - } - else { - throw new IllegalArgumentException("PathDeletionContext provided is not " - + "TaskControllerTaskPathDeletionContext."); - } - } - - /** - * Enables the job for cleanup by changing permissions of the specified path - * in the local filesystem - */ - @Override - void enableJobForCleanup(PathDeletionContext context) - throws IOException { - if (context instanceof TaskControllerJobPathDeletionContext) { - TaskControllerJobPathDeletionContext tContext = - (TaskControllerJobPathDeletionContext) context; - enablePathForCleanup(tContext, - TaskControllerCommands.ENABLE_JOB_FOR_CLEANUP, - buildJobCleanupArgs(tContext)); - } else { - throw new IllegalArgumentException("PathDeletionContext provided is not " - + "TaskControllerJobPathDeletionContext."); - } - } - - /** - * Enable a path for cleanup - * @param c {@link TaskControllerPathDeletionContext} for the path to be - * cleaned up - * @param command {@link TaskControllerCommands} for task/job cleanup - * @param cleanupArgs arguments for the {@link LinuxTaskController} to enable - * path cleanup - */ - private void enablePathForCleanup(TaskControllerPathDeletionContext c, - TaskControllerCommands command, - List cleanupArgs) { - if (LOG.isDebugEnabled()) { - LOG.debug("Going to do " + command.toString() + " for " + c.fullPath); - } - - if ( c.user != null && c.fs instanceof LocalFileSystem) { - try { - runCommand(command, c.user, cleanupArgs, null, null); - } catch(IOException e) { - LOG.warn("Unable to change permissions for " + c.fullPath); - } - } - else { - throw new IllegalArgumentException("Either user is null or the " - + "file system is not local file system."); - } - } - - private void logOutput(String output) { - String shExecOutput = output; - if (shExecOutput != null) { - for (String str : shExecOutput.split("\n")) { - LOG.info(str); - } - } - } - - private String getJobId(TaskExecContext context) { - String taskId = context.task.getTaskID().toString(); - TaskAttemptID tId = TaskAttemptID.forName(taskId); - String jobId = tId.getJobID().toString(); - return jobId; - } - - /** - * Returns list of arguments to be passed while launching task VM. - * See {@code buildTaskControllerExecutor(TaskControllerCommands, - * String, List, JvmEnv)} documentation. - * @param context - * @return Argument to be used while launching Task VM - */ - private List buildLaunchTaskArgs(TaskExecContext context, - File workDir) { - List commandArgs = new ArrayList(3); - LOG.debug("getting the task directory as: " - + getTaskCacheDirectory(context, workDir)); - LOG.debug("getting the tt_root as " +getDirectoryChosenForTask( - new File(getTaskCacheDirectory(context, workDir)), - context) ); - commandArgs.add(getDirectoryChosenForTask( - new File(getTaskCacheDirectory(context, workDir)), - context)); - commandArgs.addAll(buildInitializeTaskArgs(context)); - return commandArgs; - } - - // Get the directory from the list of directories configured - // in Configs.LOCAL_DIR chosen for storing data pertaining to - // this task. - private String getDirectoryChosenForTask(File directory, - TaskExecContext context) { - String jobId = getJobId(context); - String taskId = context.task.getTaskID().toString(); - for (String dir : mapredLocalDirs) { - File mapredDir = new File(dir); - File taskDir = - new File(mapredDir, TaskTracker.getTaskWorkDir(context.task - .getUser(), jobId, taskId, context.task.isTaskCleanupTask())) - .getParentFile(); - if (directory.equals(taskDir)) { - return dir; - } - } - - LOG.error("Couldn't parse task cache directory correctly"); - throw new IllegalArgumentException("invalid task cache directory " - + directory.getAbsolutePath()); - } - - /** - * Builds the command line for launching/terminating/killing task JVM. - * Following is the format for launching/terminating/killing task JVM - *
- * For launching following is command line argument: - *
- * {@code mapreduce.job.user.name command tt-root job_id task_id} - *
- * For terminating/killing task jvm. - * {@code mapreduce.job.user.name command tt-root task-pid} - * - * @param command command to be executed. - * @param userName mapreduce.job.user.name - * @param cmdArgs list of extra arguments - * @param workDir working directory for the task-controller - * @param env JVM environment variables. - * @return {@link ShellCommandExecutor} - * @throws IOException - */ - private ShellCommandExecutor buildTaskControllerExecutor( - TaskControllerCommands command, String userName, List cmdArgs, - File workDir, Map env) - throws IOException { - String[] taskControllerCmd = new String[3 + cmdArgs.size()]; - taskControllerCmd[0] = getTaskControllerExecutablePath(); - taskControllerCmd[1] = userName; - taskControllerCmd[2] = String.valueOf(command.ordinal()); - int i = 3; - for (String cmdArg : cmdArgs) { - taskControllerCmd[i++] = cmdArg; - } - if (LOG.isDebugEnabled()) { - for (String cmd : taskControllerCmd) { - LOG.debug("taskctrl command = " + cmd); - } - } - ShellCommandExecutor shExec = null; - if(workDir != null && workDir.exists()) { - shExec = new ShellCommandExecutor(taskControllerCmd, - workDir, env); - } else { - shExec = new ShellCommandExecutor(taskControllerCmd); - } - - return shExec; - } - - // Return the task specific directory under the cache. - private String getTaskCacheDirectory(TaskExecContext context, - File workDir) { - // In the case of JVM reuse, the task specific directory - // is different from what is set with respect with - // env.workDir. Hence building this from the taskId everytime. - String taskId = context.task.getTaskID().toString(); - File cacheDirForJob = workDir.getParentFile().getParentFile(); - if(context.task.isTaskCleanupTask()) { - taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX; - } - return new File(cacheDirForJob, taskId).getAbsolutePath(); - } - - // Write the JVM command line to a file under the specified directory - // Note that the JVM will be launched using a setuid executable, and - // could potentially contain strings defined by a user. Hence, to - // prevent special character attacks, we write the command line to - // a file and execute it. - private void writeCommand(String cmdLine, - String directory) throws IOException { - - PrintWriter pw = null; - String commandFile = directory + File.separator + COMMAND_FILE; - LOG.info("Writing commands to " + commandFile); - LOG.info("--------Commands Begin--------"); - LOG.info(cmdLine); - LOG.info("--------Commands End--------"); - try { - FileWriter fw = new FileWriter(commandFile); - BufferedWriter bw = new BufferedWriter(fw); - pw = new PrintWriter(bw); - pw.write(cmdLine); - } catch (IOException ioe) { - LOG.error("Caught IOException while writing JVM command line to file. " - + ioe.getMessage()); - } finally { - if (pw != null) { - pw.close(); - } - // set execute permissions for all on the file. - File f = new File(commandFile); - if (f.exists()) { - f.setReadable(true, false); - f.setExecutable(true, false); - } - } - } - - private List buildInitializeJobCommandArgs( - JobInitializationContext context) { - List initJobCmdArgs = new ArrayList(); - initJobCmdArgs.add(context.jobid.toString()); - return initJobCmdArgs; - } - - @Override - void initializeJob(JobInitializationContext context) - throws IOException { - LOG.debug("Going to initialize job " + context.jobid.toString() - + " on the TT"); - runCommand(TaskControllerCommands.INITIALIZE_JOB, context.user, - buildInitializeJobCommandArgs(context), context.workDir, null); - } - - @Override - public void initializeDistributedCacheFile(DistributedCacheFileContext context) - throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Going to initialize distributed cache for " + context.user - + " with localizedBaseDir " + context.localizedBaseDir + - " and uniqueString " + context.uniqueString); - } - List args = new ArrayList(); - // Here, uniqueString might start with '-'. Adding -- in front of the - // arguments indicates that they are non-option parameters. - args.add("--"); - args.add(context.localizedBaseDir.toString()); - args.add(context.uniqueString); - runCommand(TaskControllerCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE, - context.user, args, context.workDir, null); - } - - @Override - public void initializeUser(InitializationContext context) - throws IOException { - LOG.debug("Going to initialize user directories for " + context.user - + " on the TT"); - runCommand(TaskControllerCommands.INITIALIZE_USER, context.user, - new ArrayList(), context.workDir, null); - } - - /** - * API which builds the command line to be pass to LinuxTaskController - * binary to terminate/kill the task. See - * {@code buildTaskControllerExecutor(TaskControllerCommands, - * String, List, JvmEnv)} documentation. - * - * - * @param context context of task which has to be passed kill signal. - * - */ - private List buildKillTaskCommandArgs(TaskControllerContext - context){ - List killTaskJVMArgs = new ArrayList(); - killTaskJVMArgs.add(context.pid); - return killTaskJVMArgs; - } - - /** - * Convenience method used to sending appropriate signal to the task - * VM - * @param context - * @param command - * @throws IOException - */ - protected void signalTask(TaskControllerContext context, - TaskControllerCommands command) throws IOException{ - if(context.task == null) { - LOG.info("Context task is null; not signaling the JVM"); - return; - } - ShellCommandExecutor shExec = buildTaskControllerExecutor( - command, context.env.conf.getUser(), - buildKillTaskCommandArgs(context), context.env.workDir, - context.env.env); - try { - shExec.execute(); - } catch (Exception e) { - LOG.warn("Output from task-contoller is : " + shExec.getOutput()); - throw new IOException(e); - } - } - - @Override - void terminateTask(TaskControllerContext context) { - try { - signalTask(context, TaskControllerCommands.TERMINATE_TASK_JVM); - } catch (Exception e) { - LOG.warn("Exception thrown while sending kill to the Task VM " + - StringUtils.stringifyException(e)); - } - } - - @Override - void killTask(TaskControllerContext context) { - try { - signalTask(context, TaskControllerCommands.KILL_TASK_JVM); - } catch (Exception e) { - LOG.warn("Exception thrown while sending destroy to the Task VM " + - StringUtils.stringifyException(e)); - } - } - - @Override - void dumpTaskStack(TaskControllerContext context) { - try { - signalTask(context, TaskControllerCommands.SIGQUIT_TASK_JVM); - } catch (Exception e) { - LOG.warn("Exception thrown while sending SIGQUIT to the Task VM " + - StringUtils.stringifyException(e)); - } - } - - protected String getTaskControllerExecutablePath() { - return taskControllerExe; - } - - @Override - String getRunAsUser(JobConf conf) { - return conf.getUser(); - } -} \ No newline at end of file diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java deleted file mode 100644 index 75e25522f95..00000000000 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java +++ /dev/null @@ -1,511 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.List; -import java.util.ArrayList; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager; -import org.apache.hadoop.mapreduce.MRConfig; -import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; -import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.StringUtils; - -import junit.framework.TestCase; - -/** - * The base class which starts up a cluster with LinuxTaskController as the task - * controller. - * - * In order to run test cases utilizing LinuxTaskController please follow the - * following steps: - *
    - *
  1. Build LinuxTaskController by not passing any - * -Dhadoop.conf.dir
  2. - *
  3. Change ownership of the built binary to root:group1, where group1 is - * a secondary group of the test runner.
  4. - *
  5. Change permissions on the binary so that others component does - * not have any permissions on binary
  6. - *
  7. Make the built binary to setuid and setgid executable
  8. - *
  9. Execute following targets: - * ant test -Dcompile.c++=true -Dtaskcontroller-path=path to built binary - * -Dtaskcontroller-ugi=user,group - *
    (Note that "path to built binary" means the directory containing task-controller - - * not the actual complete path of the binary itself. This path must end in ".../bin") - *
  10. - *
- * - */ -public class ClusterWithLinuxTaskController extends TestCase { - private static final Log LOG = - LogFactory.getLog(ClusterWithLinuxTaskController.class); - - /** - * The wrapper class around LinuxTaskController which allows modification of - * the custom path to task-controller which we can use for task management. - * - **/ - public static class MyLinuxTaskController extends LinuxTaskController { - String taskControllerExePath = System.getProperty(TASKCONTROLLER_PATH) - + "/task-controller"; - - @Override - public void setup() throws IOException { - getConf().set(TTConfig.TT_GROUP, taskTrackerSpecialGroup); - - // write configuration file - configurationFile = createTaskControllerConf(System - .getProperty(TASKCONTROLLER_PATH), getConf()); - super.setup(); - } - - @Override - protected String getTaskControllerExecutablePath() { - return new File(taskControllerExePath).getAbsolutePath(); - } - - void setTaskControllerExe(String execPath) { - this.taskControllerExePath = execPath; - } - - volatile static int attemptedSigQuits = 0; - volatile static int failedSigQuits = 0; - - /** Work like LinuxTaskController, but also count the number of - * attempted and failed SIGQUIT sends via the task-controller - * executable. - */ - @Override - void dumpTaskStack(TaskControllerContext context) { - attemptedSigQuits++; - try { - signalTask(context, TaskControllerCommands.SIGQUIT_TASK_JVM); - } catch (Exception e) { - LOG.warn("Execution sending SIGQUIT: " + StringUtils.stringifyException(e)); - failedSigQuits++; - } - } - } - - // cluster instances which sub classes can use - protected MiniMRCluster mrCluster = null; - protected MiniDFSCluster dfsCluster = null; - - private JobConf clusterConf = null; - protected Path homeDirectory; - - /** changing this to a larger number needs more work for creating - * taskcontroller.cfg. - * see {@link #startCluster()} and - * {@link #createTaskControllerConf(String, Configuration)} - */ - private static final int NUMBER_OF_NODES = 1; - - static final String TASKCONTROLLER_PATH = "taskcontroller-path"; - static final String TASKCONTROLLER_UGI = "taskcontroller-ugi"; - - private static File configurationFile = null; - - protected UserGroupInformation jobOwner; - - protected static String taskTrackerSpecialGroup = null; - /** - * Primary group of the tasktracker - i.e. the user running the - * test. - */ - protected static String taskTrackerPrimaryGroup = null; - static { - if (isTaskExecPathPassed()) { - try { - taskTrackerSpecialGroup = FileSystem.getLocal(new Configuration()) - .getFileStatus( - new Path(System.getProperty(TASKCONTROLLER_PATH), - "task-controller")).getGroup(); - } catch (IOException e) { - LOG.warn("Could not get group of the binary", e); - fail("Could not get group of the binary"); - } - try { - taskTrackerPrimaryGroup = - UserGroupInformation.getCurrentUser().getGroupNames()[0]; - } catch (IOException ioe) { - LOG.warn("Could not get primary group of the current user", ioe); - fail("Could not get primary group of the current user"); - } - } - } - - /* - * Utility method which subclasses use to start and configure the MR Cluster - * so they can directly submit a job. - */ - protected void startCluster() - throws IOException, InterruptedException { - JobConf conf = new JobConf(); - dfsCluster = new MiniDFSCluster(conf, NUMBER_OF_NODES, true, null); - conf.set(TTConfig.TT_TASK_CONTROLLER, - MyLinuxTaskController.class.getName()); - conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, false); - mrCluster = - new MiniMRCluster(NUMBER_OF_NODES, dfsCluster.getFileSystem().getUri() - .toString(), 4, null, null, conf); - - clusterConf = mrCluster.createJobConf(); - - String ugi = System.getProperty(TASKCONTROLLER_UGI); - String[] splits = ugi.split(","); - jobOwner = UserGroupInformation.createUserForTesting(splits[0], - new String[]{splits[1]}); - createHomeAndStagingDirectory(clusterConf); - } - - private void createHomeAndStagingDirectory(JobConf conf) - throws IOException { - FileSystem fs = dfsCluster.getFileSystem(); - String path = "/user/" + jobOwner.getUserName(); - homeDirectory = new Path(path); - LOG.info("Creating Home directory : " + homeDirectory); - fs.mkdirs(homeDirectory); - changePermission(fs); - Path stagingArea = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT)); - LOG.info("Creating Staging root directory : " + stagingArea); - fs.mkdirs(stagingArea); - fs.setPermission(stagingArea, new FsPermission((short)0777)); - } - - private void changePermission(FileSystem fs) - throws IOException { - fs.setOwner(homeDirectory, jobOwner.getUserName(), - jobOwner.getGroupNames()[0]); - } - - static File getTaskControllerConfFile(String path) { - File confDirectory = new File(path, "../conf"); - return new File(confDirectory, "taskcontroller.cfg"); - } - - /** - * Create taskcontroller.cfg. - * - * @param path Path to the taskcontroller binary. - * @param conf TaskTracker's configuration - * @return the created conf file - * @throws IOException - */ - static File createTaskControllerConf(String path, - Configuration conf) throws IOException { - File confDirectory = new File(path, "../conf"); - if (!confDirectory.exists()) { - confDirectory.mkdirs(); - } - File configurationFile = new File(confDirectory, "taskcontroller.cfg"); - PrintWriter writer = - new PrintWriter(new FileOutputStream(configurationFile)); - - writer.println(String.format(MRConfig.LOCAL_DIR + "=%s", conf - .get(MRConfig.LOCAL_DIR))); - - writer - .println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir())); - writer.println(String.format(TTConfig.TT_GROUP + "=%s", - conf.get(TTConfig.TT_GROUP))); - - writer.flush(); - writer.close(); - return configurationFile; - } - - /** - * Can we run the tests with LinuxTaskController? - * - * @return boolean - */ - protected static boolean shouldRun() { - if (!isTaskExecPathPassed() || !isUserPassed()) { - LOG.info("Not running test."); - return false; - } - return true; - } - - static boolean isTaskExecPathPassed() { - String path = System.getProperty(TASKCONTROLLER_PATH); - if (path == null || path.isEmpty() - || path.equals("${" + TASKCONTROLLER_PATH + "}")) { - LOG.info("Invalid taskcontroller-path : " + path); - return false; - } - return true; - } - - private static boolean isUserPassed() { - String ugi = System.getProperty(TASKCONTROLLER_UGI); - if (ugi != null && !(ugi.equals("${" + TASKCONTROLLER_UGI + "}")) - && !ugi.isEmpty()) { - if (ugi.indexOf(",") > 1) { - return true; - } - LOG.info("Invalid taskcontroller-ugi : " + ugi); - return false; - } - LOG.info("Invalid taskcontroller-ugi : " + ugi); - return false; - } - - protected JobConf getClusterConf() { - return new JobConf(clusterConf); - } - - @Override - protected void tearDown() - throws Exception { - if (mrCluster != null) { - mrCluster.shutdown(); - } - - if (dfsCluster != null) { - dfsCluster.shutdown(); - } - - if (configurationFile != null) { - configurationFile.delete(); - } - - super.tearDown(); - } - - /** - * Assert that the job is actually run by the specified user by verifying the - * permissions of the output part-files. - * - * @param outDir - * @throws IOException - */ - protected void assertOwnerShip(Path outDir) - throws IOException { - FileSystem fs = outDir.getFileSystem(clusterConf); - assertOwnerShip(outDir, fs); - } - - /** - * Assert that the job is actually run by the specified user by verifying the - * permissions of the output part-files. - * - * @param outDir - * @param fs - * @throws IOException - */ - protected void assertOwnerShip(Path outDir, FileSystem fs) - throws IOException { - for (FileStatus status : fs.listStatus(outDir, - new Utils.OutputFileUtils - .OutputFilesFilter())) { - String owner = status.getOwner(); - String group = status.getGroup(); - LOG.info("Ownership of the file is " + status.getPath() + " is " + owner - + "," + group); - assertTrue("Output part-file's owner is not correct. Expected : " - + jobOwner.getUserName() + " Found : " + owner, owner - .equals(jobOwner.getUserName())); - assertTrue("Output part-file's group is not correct. Expected : " - + jobOwner.getGroupNames()[0] + " Found : " + group, group - .equals(jobOwner.getGroupNames()[0])); - } - } - - /** - * Validates permissions of private distcache dir and its contents fully - */ - public static void checkPermissionsOnPrivateDistCache(String[] localDirs, - String user, String taskTrackerUser, String groupOwner) - throws IOException { - // user-dir, jobcache and distcache will have - // 2770 permissions if jobOwner is same as tt_user - // 2570 permissions for any other user - String expectedDirPerms = taskTrackerUser.equals(user) - ? "drwxrws---" - : "dr-xrws---"; - String expectedFilePerms = taskTrackerUser.equals(user) - ? "-rwxrwx---" - : "-r-xrwx---"; - for (String localDir : localDirs) { - File distCacheDir = new File(localDir, - TaskTracker.getPrivateDistributedCacheDir(user)); - if (distCacheDir.exists()) { - checkPermissionsOnDir(distCacheDir, user, groupOwner, expectedDirPerms, - expectedFilePerms); - } - } - } - - /** - * Check that files expected to be localized in distributed cache for a user - * are present. - * @param localDirs List of mapred local directories. - * @param user User against which localization is happening - * @param expectedFileNames List of files expected to be localized - * @throws IOException - */ - public static void checkPresenceOfPrivateDistCacheFiles(String[] localDirs, - String user, String[] expectedFileNames) throws IOException { - FileGatherer gatherer = new FileGatherer(); - for (String localDir : localDirs) { - File distCacheDir = new File(localDir, - TaskTracker.getPrivateDistributedCacheDir(user)); - findExpectedFiles(expectedFileNames, distCacheDir, gatherer); - } - assertEquals("Files expected in private distributed cache were not found", - expectedFileNames.length, gatherer.getCount()); - } - - /** - * Validates permissions and ownership of public distcache dir and its - * contents fully in all local dirs - */ - public static void checkPermissionsOnPublicDistCache(FileSystem localFS, - String[] localDirs, String owner, String group) throws IOException { - for (String localDir : localDirs) { - File distCacheDir = new File(localDir, - TaskTracker.getPublicDistributedCacheDir()); - - if (distCacheDir.exists()) { - checkPublicFilePermissions(localFS, distCacheDir, owner, group); - } - } - } - - /** - * Checks that files expected to be localized in the public distributed - * cache are present - * @param localDirs List of mapred local directories - * @param expectedFileNames List of expected file names. - * @throws IOException - */ - public static void checkPresenceOfPublicDistCacheFiles(String[] localDirs, - String[] expectedFileNames) throws IOException { - FileGatherer gatherer = new FileGatherer(); - for (String localDir : localDirs) { - File distCacheDir = new File(localDir, - TaskTracker.getPublicDistributedCacheDir()); - findExpectedFiles(expectedFileNames, distCacheDir, gatherer); - } - assertEquals("Files expected in public distributed cache were not found", - expectedFileNames.length, gatherer.getCount()); - } - - /** - * Validates permissions and ownership on the public distributed cache files - */ - private static void checkPublicFilePermissions(FileSystem localFS, File dir, - String owner, String group) - throws IOException { - Path dirPath = new Path(dir.getAbsolutePath()); - TestTrackerDistributedCacheManager.checkPublicFilePermissions(localFS, - new Path[] {dirPath}); - TestTrackerDistributedCacheManager.checkPublicFileOwnership(localFS, - new Path[] {dirPath}, owner, group); - if (dir.isDirectory()) { - File[] files = dir.listFiles(); - for (File file : files) { - checkPublicFilePermissions(localFS, file, owner, group); - } - } - } - - /** - * Validates permissions of given dir and its contents fully(i.e. recursively) - */ - private static void checkPermissionsOnDir(File dir, String user, - String groupOwner, String expectedDirPermissions, - String expectedFilePermissions) throws IOException { - TestTaskTrackerLocalization.checkFilePermissions(dir.toString(), - expectedDirPermissions, user, groupOwner); - File[] files = dir.listFiles(); - for (File file : files) { - if (file.isDirectory()) { - checkPermissionsOnDir(file, user, groupOwner, expectedDirPermissions, - expectedFilePermissions); - } else { - TestTaskTrackerLocalization.checkFilePermissions(file.toString(), - expectedFilePermissions, user, groupOwner); - } - } - } - - // Check which files among those expected are present in the rootDir - // Add those present to the FileGatherer. - private static void findExpectedFiles(String[] expectedFileNames, - File rootDir, FileGatherer gatherer) { - - File[] files = rootDir.listFiles(); - if (files == null) { - return; - } - for (File file : files) { - if (file.isDirectory()) { - findExpectedFiles(expectedFileNames, file, gatherer); - } else { - if (isFilePresent(expectedFileNames, file)) { - gatherer.addFileName(file.getName()); - } - } - } - - } - - // Test if the passed file is present in the expected list of files. - private static boolean isFilePresent(String[] expectedFileNames, File file) { - boolean foundFileName = false; - for (String name : expectedFileNames) { - if (name.equals(file.getName())) { - foundFileName = true; - break; - } - } - return foundFileName; - } - - // Helper class to collect a list of file names across multiple - // method calls. Wrapper around a collection defined for clarity - private static class FileGatherer { - List foundFileNames = new ArrayList(); - - void addFileName(String fileName) { - foundFileNames.add(fileName); - } - - int getCount() { - return foundFileNames.size(); - } - } -} diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestDebugScriptWithLinuxTaskController.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestDebugScriptWithLinuxTaskController.java deleted file mode 100644 index 7a274a4ff1e..00000000000 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestDebugScriptWithLinuxTaskController.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred; - -import java.io.IOException; -import java.security.PrivilegedExceptionAction; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.security.UserGroupInformation; -import org.junit.Test; - -public class TestDebugScriptWithLinuxTaskController extends - ClusterWithLinuxTaskController { - - @Test - public void testDebugScriptExecutionAsDifferentUser() throws Exception { - if (!super.shouldRun()) { - return; - } - super.startCluster(); - TestDebugScript.setupDebugScriptDirs(); - final Path inDir = new Path("input"); - final Path outDir = new Path("output"); - JobConf conf = super.getClusterConf(); - FileSystem fs = inDir.getFileSystem(conf); - fs.mkdirs(inDir); - Path p = new Path(inDir, "1.txt"); - fs.createNewFile(p); - String splits[] = System - .getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI). - split(","); - JobID jobId = UserGroupInformation.createUserForTesting(splits[0], - new String[]{splits[1]}).doAs(new PrivilegedExceptionAction() { - public JobID run() throws IOException{ - return TestDebugScript.runFailingMapJob( - TestDebugScriptWithLinuxTaskController.this.getClusterConf(), - inDir, outDir); - } - }); - // construct the task id of first map task of failmap - TaskAttemptID taskId = new TaskAttemptID( - new TaskID(jobId,TaskType.MAP, 0), 0); - TestDebugScript.verifyDebugScriptOutput(taskId, splits[0], - taskTrackerSpecialGroup, "-rw-rw----"); - TestDebugScript.cleanupDebugScriptDirs(); - } -} diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java deleted file mode 100644 index 8fbc8831089..00000000000 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred; - -import java.io.IOException; -import java.security.PrivilegedExceptionAction; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.SleepJob; -import org.apache.hadoop.util.ToolRunner; - -/** - * Test a java-based mapred job with LinuxTaskController running the jobs as a - * user different from the user running the cluster. See - * {@link ClusterWithLinuxTaskController} - */ -public class TestJobExecutionAsDifferentUser extends - ClusterWithLinuxTaskController { - - public void testJobExecution() - throws Exception { - if (!shouldRun()) { - return; - } - startCluster(); - - - jobOwner.doAs(new PrivilegedExceptionAction() { - public Object run() throws Exception { - Path inDir = new Path("input"); - Path outDir = new Path("output"); - - RunningJob job; - // Run a job with zero maps/reduces - job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 0, 0); - job.waitForCompletion(); - assertTrue("Job failed", job.isSuccessful()); - assertOwnerShip(outDir); - - // Run a job with 1 map and zero reduces - job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 0); - job.waitForCompletion(); - assertTrue("Job failed", job.isSuccessful()); - assertOwnerShip(outDir); - - // Run a normal job with maps/reduces - job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 1); - job.waitForCompletion(); - assertTrue("Job failed", job.isSuccessful()); - assertOwnerShip(outDir); - - // Run a job with jvm reuse - JobConf myConf = getClusterConf(); - myConf.set(JobContext.JVM_NUMTASKS_TORUN, "-1"); - String[] args = { "-m", "6", "-r", "3", "-mt", "1000", "-rt", "1000" }; - assertEquals(0, ToolRunner.run(myConf, new SleepJob(), args)); - return null; - } - }); - - } - - public void testEnvironment() throws Exception { - if (!shouldRun()) { - return; - } - startCluster(); - jobOwner.doAs(new PrivilegedExceptionAction() { - public Object run() throws Exception { - - TestMiniMRChildTask childTask = new TestMiniMRChildTask(); - Path inDir = new Path("input1"); - Path outDir = new Path("output1"); - try { - childTask.runTestTaskEnv(getClusterConf(), inDir, outDir, false); - } catch (IOException e) { - fail("IOException thrown while running enviroment test." - + e.getMessage()); - } finally { - FileSystem outFs = outDir.getFileSystem(getClusterConf()); - if (outFs.exists(outDir)) { - assertOwnerShip(outDir); - outFs.delete(outDir, true); - } else { - fail("Output directory does not exist" + outDir.toString()); - } - return null; - } - } - }); - } - - /** Ensure that SIGQUIT can be properly sent by the LinuxTaskController - * if a task times out. - */ - public void testTimeoutStackTrace() throws Exception { - if (!shouldRun()) { - return; - } - - // Run a job that should timeout and trigger a SIGQUIT. - startCluster(); - jobOwner.doAs(new PrivilegedExceptionAction() { - public Object run() throws Exception { - JobConf conf = getClusterConf(); - conf.setInt(JobContext.TASK_TIMEOUT, 10000); - conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50); - SleepJob sleepJob = new SleepJob(); - sleepJob.setConf(conf); - Job job = sleepJob.createJob(1, 0, 30000, 1, 0, 0); - job.setMaxMapAttempts(1); - int prevNumSigQuits = MyLinuxTaskController.attemptedSigQuits; - job.waitForCompletion(true); - assertTrue("Did not detect a new SIGQUIT!", - prevNumSigQuits < MyLinuxTaskController.attemptedSigQuits); - assertEquals("A SIGQUIT attempt failed!", 0, - MyLinuxTaskController.failedSigQuits); - return null; - } - }); - } -} diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcessesWithLinuxTaskController.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcessesWithLinuxTaskController.java deleted file mode 100644 index ce32aa8a03f..00000000000 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcessesWithLinuxTaskController.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred; - -import java.security.PrivilegedExceptionAction; - -/** - * Test killing of child processes spawned by the jobs with LinuxTaskController - * running the jobs as a user different from the user running the cluster. - * See {@link ClusterWithLinuxTaskController} - */ - -public class TestKillSubProcessesWithLinuxTaskController extends - ClusterWithLinuxTaskController { - - public void testKillSubProcess() throws Exception{ - if(!shouldRun()) { - return; - } - startCluster(); - jobOwner.doAs(new PrivilegedExceptionAction() { - public Object run() throws Exception { - JobConf myConf = getClusterConf(); - JobTracker jt = mrCluster.getJobTrackerRunner().getJobTracker(); - - TestKillSubProcesses.mr = mrCluster; - TestKillSubProcesses sbProc = new TestKillSubProcesses(); - sbProc.runTests(myConf, jt); - return null; - } - }); - } -} diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java deleted file mode 100644 index 6ce493a1e53..00000000000 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.mapred; - -import java.io.File; -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.mapreduce.MRConfig; -import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; -import org.apache.hadoop.security.Groups; -import org.apache.hadoop.security.UserGroupInformation; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import junit.framework.TestCase; - -public class TestLinuxTaskController extends TestCase { - private static int INVALID_TASKCONTROLLER_PERMISSIONS = 24; - private static File testDir = new File(System.getProperty("test.build.data", - "/tmp"), TestLinuxTaskController.class.getName()); - private static String taskControllerPath = System - .getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH); - - @Before - protected void setUp() throws Exception { - testDir.mkdirs(); - } - - @After - protected void tearDown() { - FileUtil.fullyDelete(testDir); - } - - public static class MyLinuxTaskController extends LinuxTaskController { - String taskControllerExePath = taskControllerPath + "/task-controller"; - - @Override - protected String getTaskControllerExecutablePath() { - return taskControllerExePath; - } - } - - private void validateTaskControllerSetup(TaskController controller, - boolean shouldFail) throws IOException { - if (shouldFail) { - // task controller setup should fail validating permissions. - Throwable th = null; - try { - controller.setup(); - } catch (IOException ie) { - th = ie; - } - assertNotNull("No exception during setup", th); - assertTrue("Exception message does not contain exit code" - + INVALID_TASKCONTROLLER_PERMISSIONS, th.getMessage().contains( - "with exit code " + INVALID_TASKCONTROLLER_PERMISSIONS)); - } else { - controller.setup(); - } - - } - - @Test - public void testTaskControllerGroup() throws Exception { - if (!ClusterWithLinuxTaskController.isTaskExecPathPassed()) { - return; - } - // cleanup configuration file. - ClusterWithLinuxTaskController - .getTaskControllerConfFile(taskControllerPath).delete(); - Configuration conf = new Configuration(); - // create local dirs and set in the conf. - File mapredLocal = new File(testDir, "mapred/local"); - mapredLocal.mkdirs(); - conf.set(MRConfig.LOCAL_DIR, mapredLocal.toString()); - - // setup task-controller without setting any group name - TaskController controller = new MyLinuxTaskController(); - controller.setConf(conf); - validateTaskControllerSetup(controller, true); - - // set an invalid group name for the task controller group - conf.set(TTConfig.TT_GROUP, "invalid"); - // write the task-controller's conf file - ClusterWithLinuxTaskController.createTaskControllerConf(taskControllerPath, - conf); - validateTaskControllerSetup(controller, true); - - conf.set(TTConfig.TT_GROUP, - ClusterWithLinuxTaskController.taskTrackerSpecialGroup); - // write the task-controller's conf file - ClusterWithLinuxTaskController.createTaskControllerConf(taskControllerPath, - conf); - validateTaskControllerSetup(controller, false); - } -} diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java deleted file mode 100644 index 3f13f4db658..00000000000 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java +++ /dev/null @@ -1,240 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.mapred; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController; -import org.apache.hadoop.mapreduce.MRConfig; -import org.apache.hadoop.security.UserGroupInformation; - -/** - * Test to verify localization of a job and localization of a task on a - * TaskTracker when {@link LinuxTaskController} is used. - * - */ -public class TestLocalizationWithLinuxTaskController extends - TestTaskTrackerLocalization { - - private static final Log LOG = - LogFactory.getLog(TestLocalizationWithLinuxTaskController.class); - - private File configFile; - private static String taskTrackerUserName; - - @Override - protected boolean canRun() { - return ClusterWithLinuxTaskController.shouldRun(); - } - - @Override - protected void setUp() - throws Exception { - - if (!canRun()) { - return; - } - - super.setUp(); - - taskTrackerUserName = UserGroupInformation.getLoginUser() - .getShortUserName(); - } - - @Override - protected void tearDown() - throws Exception { - if (!canRun()) { - return; - } - super.tearDown(); - if (configFile != null) { - configFile.delete(); - } - } - - protected TaskController createTaskController() { - return new MyLinuxTaskController(); - } - - protected UserGroupInformation getJobOwner() { - String ugi = System - .getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI); - String[] splits = ugi.split(","); - return UserGroupInformation.createUserForTesting(splits[0], - new String[] { splits[1] }); - } - - /** @InheritDoc */ - @Override - public void testTaskControllerSetup() { - // Do nothing. - } - - @Override - protected void checkUserLocalization() - throws IOException { - // Check the directory structure and permissions - for (String dir : localDirs) { - - File localDir = new File(dir); - assertTrue(MRConfig.LOCAL_DIR + localDir + " isn'task created!", - localDir.exists()); - - File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR); - assertTrue("taskTracker sub-dir in the local-dir " + localDir - + "is not created!", taskTrackerSubDir.exists()); - - // user-dir, jobcache and distcache will have - // 2770 permissions if jobOwner is same as tt_user - // 2570 permissions for any other user - String expectedDirPerms = taskTrackerUserName.equals(task.getUser()) - ? "drwxrws---" - : "dr-xrws---"; - - File userDir = new File(taskTrackerSubDir, task.getUser()); - assertTrue("user-dir in taskTrackerSubdir " + taskTrackerSubDir - + "is not created!", userDir.exists()); - - checkFilePermissions(userDir.getAbsolutePath(), expectedDirPerms, task - .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup); - - File jobCache = new File(userDir, TaskTracker.JOBCACHE); - assertTrue("jobcache in the userDir " + userDir + " isn't created!", - jobCache.exists()); - - checkFilePermissions(jobCache.getAbsolutePath(), expectedDirPerms, task - .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup); - - // Verify the distributed cache dir. - File distributedCacheDir = - new File(localDir, TaskTracker - .getPrivateDistributedCacheDir(task.getUser())); - assertTrue("distributed cache dir " + distributedCacheDir - + " doesn't exists!", distributedCacheDir.exists()); - checkFilePermissions(distributedCacheDir.getAbsolutePath(), - expectedDirPerms, task.getUser(), - ClusterWithLinuxTaskController.taskTrackerSpecialGroup); - } - } - - @Override - protected void checkJobLocalization() - throws IOException { - // job-dir, jars-dir and subdirectories in them will have - // 2770 permissions if jobOwner is same as tt_user - // 2570 permissions for any other user - // Files under these dirs will have - // 770 permissions if jobOwner is same as tt_user - // 570 permissions for any other user - String expectedDirPerms = taskTrackerUserName.equals(task.getUser()) - ? "drwxrws---" - : "dr-xrws---"; - String expectedFilePerms = taskTrackerUserName.equals(task.getUser()) - ? "-rwxrwx---" - : "-r-xrwx---"; - - for (String localDir : trackerFConf.getStrings(MRConfig.LOCAL_DIR)) { - File jobDir = - new File(localDir, TaskTracker.getLocalJobDir(task.getUser(), jobId - .toString())); - // check the private permissions on the job directory - checkFilePermissions(jobDir.getAbsolutePath(), expectedDirPerms, task - .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup); - } - - // check the private permissions of various directories - List dirs = new ArrayList(); - Path jarsDir = - lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarsDir(task.getUser(), - jobId.toString()), trackerFConf); - dirs.add(jarsDir); - dirs.add(new Path(jarsDir, "lib")); - for (Path dir : dirs) { - checkFilePermissions(dir.toUri().getPath(), expectedDirPerms, - task.getUser(), - ClusterWithLinuxTaskController.taskTrackerSpecialGroup); - } - - // job-work dir needs user writable permissions i.e. 2770 for any user - Path jobWorkDir = - lDirAlloc.getLocalPathToRead(TaskTracker.getJobWorkDir(task.getUser(), - jobId.toString()), trackerFConf); - checkFilePermissions(jobWorkDir.toUri().getPath(), "drwxrws---", task - .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup); - - // check the private permissions of various files - List files = new ArrayList(); - files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalJobConfFile( - task.getUser(), jobId.toString()), trackerFConf)); - files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(task - .getUser(), jobId.toString()), trackerFConf)); - files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib1.jar")); - files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib2.jar")); - for (Path file : files) { - checkFilePermissions(file.toUri().getPath(), expectedFilePerms, task - .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup); - } - - // check job user-log directory permissions - File jobLogDir = TaskLog.getJobDir(jobId); - checkFilePermissions(jobLogDir.toString(), expectedDirPerms, task.getUser(), - ClusterWithLinuxTaskController.taskTrackerSpecialGroup); - // check job-acls.xml file permissions - checkFilePermissions(jobLogDir.toString() + Path.SEPARATOR - + TaskTracker.jobACLsFile, expectedFilePerms, task.getUser(), - ClusterWithLinuxTaskController.taskTrackerSpecialGroup); - - // validate the content of job ACLs file - validateJobACLsFileContent(); - } - - @Override - protected void checkTaskLocalization() - throws IOException { - // check the private permissions of various directories - List dirs = new ArrayList(); - dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(task - .getUser(), jobId.toString(), taskId.toString(), - task.isTaskCleanupTask()), trackerFConf)); - dirs.add(attemptWorkDir); - dirs.add(new Path(attemptWorkDir, "tmp")); - dirs.add(new Path(attemptLogFiles[1].getParentFile().getAbsolutePath())); - for (Path dir : dirs) { - checkFilePermissions(dir.toUri().getPath(), "drwxrws---", - task.getUser(), - ClusterWithLinuxTaskController.taskTrackerSpecialGroup); - } - - // check the private permissions of various files - List files = new ArrayList(); - files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task - .getUser(), task.getJobID().toString(), task.getTaskID().toString(), - task.isTaskCleanupTask()), trackerFConf)); - for (Path file : files) { - checkFilePermissions(file.toUri().getPath(), "-rwxrwx---", task - .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup); - } - } -} diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java deleted file mode 100644 index 5ebd6ec0f21..00000000000 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred; - -import java.io.File; -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController; -import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager; -import org.apache.hadoop.security.UserGroupInformation; - -/** - * Test the DistributedCacheManager when LinuxTaskController is used. - * - */ -public class TestTrackerDistributedCacheManagerWithLinuxTaskController extends - TestTrackerDistributedCacheManager { - - private File configFile; - - private static final Log LOG = - LogFactory - .getLog(TestTrackerDistributedCacheManagerWithLinuxTaskController.class); - - @Override - protected void setUp() - throws IOException, InterruptedException { - - if (!ClusterWithLinuxTaskController.shouldRun()) { - return; - } - - TEST_ROOT_DIR = - new File(System.getProperty("test.build.data", "/tmp"), - TestTrackerDistributedCacheManagerWithLinuxTaskController.class - .getSimpleName()).getAbsolutePath(); - - super.setUp(); - - taskController = new MyLinuxTaskController(); - String path = - System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH); - String execPath = path + "/task-controller"; - ((MyLinuxTaskController)taskController).setTaskControllerExe(execPath); - taskController.setConf(conf); - taskController.setup(); - } - - @Override - protected void refreshConf(Configuration conf) throws IOException { - super.refreshConf(conf); - String path = - System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH); - configFile = - ClusterWithLinuxTaskController.createTaskControllerConf(path, conf); - - } - - @Override - protected void tearDown() - throws IOException { - if (!ClusterWithLinuxTaskController.shouldRun()) { - return; - } - if (configFile != null) { - configFile.delete(); - } - super.tearDown(); - } - - @Override - protected boolean canRun() { - return ClusterWithLinuxTaskController.shouldRun(); - } - - @Override - protected String getJobOwnerName() { - String ugi = - System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI); - String userName = ugi.split(",")[0]; - return userName; - } - - @Override - protected void checkFilePermissions(Path[] localCacheFiles) - throws IOException { - String userName = getJobOwnerName(); - String filePermissions = UserGroupInformation.getLoginUser() - .getShortUserName().equals(userName) ? "-rwxrwx---" : "-r-xrwx---"; - - for (Path p : localCacheFiles) { - // First make sure that the cache file has proper permissions. - TestTaskTrackerLocalization.checkFilePermissions(p.toUri().getPath(), - filePermissions, userName, - ClusterWithLinuxTaskController.taskTrackerSpecialGroup); - // Now. make sure that all the path components also have proper - // permissions. - checkPermissionOnPathComponents(p.toUri().getPath(), userName); - } - - } - - /** - * @param cachedFilePath - * @param userName - * @throws IOException - */ - private void checkPermissionOnPathComponents(String cachedFilePath, - String userName) - throws IOException { - // The trailing distcache/file/... string - String trailingStringForFirstFile = - cachedFilePath.replaceFirst(ROOT_MAPRED_LOCAL_DIR.getAbsolutePath() - + Path.SEPARATOR + "0_[0-" + (numLocalDirs - 1) + "]" - + Path.SEPARATOR + TaskTracker.getPrivateDistributedCacheDir(userName), - ""); - LOG.info("Trailing path for cacheFirstFile is : " - + trailingStringForFirstFile); - // The leading mapreduce.cluster.local.dir/0_[0-n]/taskTracker/$user string. - String leadingStringForFirstFile = - cachedFilePath.substring(0, cachedFilePath - .lastIndexOf(trailingStringForFirstFile)); - LOG.info("Leading path for cacheFirstFile is : " - + leadingStringForFirstFile); - - String dirPermissions = UserGroupInformation.getLoginUser() - .getShortUserName().equals(userName) ? "drwxrws---" : "dr-xrws---"; - - // Now check path permissions, starting with cache file's parent dir. - File path = new File(cachedFilePath).getParentFile(); - while (!path.getAbsolutePath().equals(leadingStringForFirstFile)) { - TestTaskTrackerLocalization.checkFilePermissions(path.getAbsolutePath(), - dirPermissions, userName, - ClusterWithLinuxTaskController.taskTrackerSpecialGroup); - path = path.getParentFile(); - } - } -} diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipesAsDifferentUser.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipesAsDifferentUser.java deleted file mode 100644 index d514bf8ff01..00000000000 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipesAsDifferentUser.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred.pipes; - -import java.security.PrivilegedExceptionAction; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.ClusterWithLinuxTaskController; -import org.apache.hadoop.mapred.JobConf; - -/** - * Test Pipes jobs with LinuxTaskController running the jobs as a user different - * from the user running the cluster. See {@link ClusterWithLinuxTaskController} - */ -public class TestPipesAsDifferentUser extends ClusterWithLinuxTaskController { - - private static final Log LOG = - LogFactory.getLog(TestPipesAsDifferentUser.class); - - public void testPipes() - throws Exception { - if (System.getProperty("compile.c++") == null) { - LOG.info("compile.c++ is not defined, so skipping TestPipes"); - return; - } - - if (!shouldRun()) { - return; - } - - super.startCluster(); - jobOwner.doAs(new PrivilegedExceptionAction() { - public Object run() throws Exception { - JobConf clusterConf = getClusterConf(); - Path inputPath = new Path(homeDirectory, "in"); - Path outputPath = new Path(homeDirectory, "out"); - - TestPipes.writeInputFile(FileSystem.get(clusterConf), inputPath); - TestPipes.runProgram(mrCluster, dfsCluster, TestPipes.wordCountSimple, - inputPath, outputPath, 3, 2, TestPipes.twoSplitOutput, clusterConf); - assertOwnerShip(outputPath); - TestPipes.cleanup(dfsCluster.getFileSystem(), outputPath); - - TestPipes.runProgram(mrCluster, dfsCluster, TestPipes.wordCountSimple, - inputPath, outputPath, 3, 0, TestPipes.noSortOutput, clusterConf); - assertOwnerShip(outputPath); - TestPipes.cleanup(dfsCluster.getFileSystem(), outputPath); - - TestPipes.runProgram(mrCluster, dfsCluster, TestPipes.wordCountPart, - inputPath, outputPath, 3, 2, TestPipes.fixedPartitionOutput, - clusterConf); - assertOwnerShip(outputPath); - TestPipes.cleanup(dfsCluster.getFileSystem(), outputPath); - - TestPipes.runNonPipedProgram(mrCluster, dfsCluster, - TestPipes.wordCountNoPipes, clusterConf); - assertOwnerShip(TestPipes.nonPipedOutDir, FileSystem - .getLocal(clusterConf)); - return null; - } - }); - } -}