MAPREDUCE-2767. Remove Linux task-controller from 0.23 branch. Contributed by Milind Bhandarkar.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1165912 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Konstantin Shvachko 2011-09-06 22:36:28 +00:00
parent 6357286bc6
commit 3cd4935629
20 changed files with 3 additions and 4511 deletions

View File

@ -1773,6 +1773,8 @@ Release 0.22.0 - Unreleased
MAPREDUCE-2571. CombineFileInputFormat.getSplits throws a MAPREDUCE-2571. CombineFileInputFormat.getSplits throws a
java.lang.ArrayStoreException. (Bochun Bai via todd) java.lang.ArrayStoreException. (Bochun Bai via todd)
MAPREDUCE-2767. Remove Linux task-controller. (Milind Bhandarkar via shv)
Release 0.21.1 - Unreleased Release 0.21.1 - Unreleased
NEW FEATURES NEW FEATURES

View File

@ -166,20 +166,6 @@
<property name="make.cmd" value="make"/> <property name="make.cmd" value="make"/>
<property name="findbugs.heap.size" value="512M"/> <property name="findbugs.heap.size" value="512M"/>
<!-- task-controller properties set here -->
<!-- Source directory from where configure is run and files are copied
-->
<property name="c++.task-controller.src"
value="${basedir}/src/c++/task-controller" />
<!-- directory where autoconf files + temporary files and src is
stored for compilation -->
<property name="build.c++.task-controller"
value="${build.c++}/task-controller" />
<!-- the default install dir is build directory override it using
-Dtask-controller.install.dir=$HADOOP_HOME/bin -->
<property name="task-controller.install.dir" value="${dist.dir}/bin" />
<!-- end of task-controller properties -->
<!-- IVY properteis set here --> <!-- IVY properteis set here -->
<property name="ivy.dir" location="ivy" /> <property name="ivy.dir" location="ivy" />
@ -710,8 +696,6 @@
<sysproperty key="test.debug.data" value="${test.debug.data}"/> <sysproperty key="test.debug.data" value="${test.debug.data}"/>
<sysproperty key="hadoop.log.dir" value="@{test.dir}/logs"/> <sysproperty key="hadoop.log.dir" value="@{test.dir}/logs"/>
<sysproperty key="test.src.dir" value="@{fileset.dir}"/> <sysproperty key="test.src.dir" value="@{fileset.dir}"/>
<sysproperty key="taskcontroller-path" value="${taskcontroller-path}"/>
<sysproperty key="taskcontroller-ugi" value="${taskcontroller-ugi}"/>
<sysproperty key="test.build.extraconf" value="@{test.dir}/extraconf" /> <sysproperty key="test.build.extraconf" value="@{test.dir}/extraconf" />
<sysproperty key="hadoop.policy.file" value="hadoop-policy.xml"/> <sysproperty key="hadoop.policy.file" value="hadoop-policy.xml"/>
<sysproperty key="java.library.path" <sysproperty key="java.library.path"
@ -1251,7 +1235,6 @@
<fileset file="${dist.dir}/src/examples/pipes/configure"/> <fileset file="${dist.dir}/src/examples/pipes/configure"/>
<fileset file="${dist.dir}/src/c++/utils/configure"/> <fileset file="${dist.dir}/src/c++/utils/configure"/>
<fileset file="${dist.dir}/src/c++/pipes/configure"/> <fileset file="${dist.dir}/src/c++/pipes/configure"/>
<fileset file="${dist.dir}/src/c++/task-controller/configure"/>
</chmod> </chmod>
<chmod perm="ugo+x" type="file" parallel="false"> <chmod perm="ugo+x" type="file" parallel="false">
<fileset dir="${dist.dir}/bin"/> <fileset dir="${dist.dir}/bin"/>
@ -1277,7 +1260,6 @@
<exclude name="${final.name}/src/examples/pipes/configure"/> <exclude name="${final.name}/src/examples/pipes/configure"/>
<exclude name="${final.name}/src/c++/utils/configure"/> <exclude name="${final.name}/src/c++/utils/configure"/>
<exclude name="${final.name}/src/c++/pipes/configure"/> <exclude name="${final.name}/src/c++/pipes/configure"/>
<exclude name="${final.name}/src/c++/task-controller/configure"/>
<include name="${final.name}/**" /> <include name="${final.name}/**" />
</tarfileset> </tarfileset>
<tarfileset dir="${build.dir}" mode="755"> <tarfileset dir="${build.dir}" mode="755">
@ -1286,7 +1268,6 @@
<include name="${final.name}/src/examples/pipes/configure"/> <include name="${final.name}/src/examples/pipes/configure"/>
<include name="${final.name}/src/c++/utils/configure"/> <include name="${final.name}/src/c++/utils/configure"/>
<include name="${final.name}/src/c++/pipes/configure"/> <include name="${final.name}/src/c++/pipes/configure"/>
<include name="${final.name}/src/c++/task-controller/configure"/>
</tarfileset> </tarfileset>
</param.listofitems> </param.listofitems>
</macro_tar> </macro_tar>
@ -1769,7 +1750,6 @@
<exclude name="src/c++/librecordio/*"/> <exclude name="src/c++/librecordio/*"/>
<exclude name="src/c++/pipes/*"/> <exclude name="src/c++/pipes/*"/>
<exclude name="src/c++/utils/*"/> <exclude name="src/c++/utils/*"/>
<exclude name="src/c++/task-controller/*"/>
<exclude name="src/examples/pipes/*"/> <exclude name="src/examples/pipes/*"/>
<exclude name="src/c++/pipes/debug/*"/> <exclude name="src/c++/pipes/debug/*"/>
</fileset> </fileset>
@ -1861,9 +1841,6 @@
<condition property="need.c++.examples.pipes.configure"> <condition property="need.c++.examples.pipes.configure">
<not> <available file="${c++.examples.pipes.src}/configure"/> </not> <not> <available file="${c++.examples.pipes.src}/configure"/> </not>
</condition> </condition>
<condition property="need.c++.task-controller.configure">
<not> <available file="${c++.task-controller.src}/configure"/> </not>
</condition>
</target> </target>
<target name="create-c++-utils-configure" depends="check-c++-configure" <target name="create-c++-utils-configure" depends="check-c++-configure"
@ -1893,19 +1870,9 @@
</exec> </exec>
</target> </target>
<target name="create-c++-task-controller-configure" depends="check-c++-configure"
if="need.c++.task-controller.configure">
<exec executable="autoreconf" dir="${c++.task-controller.src}"
searchpath="yes" failonerror="yes">
<arg value="-i"/>
<arg value="-f"/>
</exec>
</target>
<target name="create-c++-configure" depends="create-c++-utils-configure, <target name="create-c++-configure" depends="create-c++-utils-configure,
create-c++-pipes-configure, create-c++-pipes-configure,
create-c++-examples-pipes-configure, create-c++-examples-pipes-configure"
create-c++-task-controller-configure"
if="compile.c++"> if="compile.c++">
</target> </target>
@ -2368,68 +2335,6 @@
</echo> </echo>
</target> </target>
<!-- taskcontroller targets -->
<target name="init-task-controller-build">
<antcall target="create-c++-task-controller-configure"/>
<mkdir dir="${build.c++.task-controller}" />
<copy todir="${build.c++.task-controller}">
<fileset dir="${c++.task-controller.src}" includes="*.c"/>
<fileset dir="${c++.task-controller.src}" includes="*.h"/>
</copy>
<chmod file="${c++.task-controller.src}/configure" perm="ugo+x"/>
<condition property="task-controller.conf.dir.passed">
<not>
<equals arg1="${hadoop.conf.dir}" arg2="$${hadoop.conf.dir}"/>
</not>
</condition>
</target>
<target name="configure-task-controller" depends="init,
init-task-controller-build,
task-controller-configuration-with-confdir,
task-controller-configuration-with-no-confdir">
</target>
<target name="task-controller-configuration-with-confdir"
if="task-controller.conf.dir.passed" >
<exec executable="${c++.task-controller.src}/configure"
dir="${build.c++.task-controller}" failonerror="yes">
<arg value="--prefix=${task-controller.install.dir}" />
<arg value="--with-confdir=${hadoop.conf.dir}" />
</exec>
</target>
<target name="task-controller-configuration-with-no-confdir"
unless="task-controller.conf.dir.passed">
<exec executable="${c++.task-controller.src}/configure"
dir="${build.c++.task-controller}" failonerror="yes">
<arg value="--prefix=${task-controller.install.dir}" />
</exec>
</target>
<!--
* Create the installation directory.
* Do a make install.
-->
<target name="task-controller" depends="configure-task-controller">
<mkdir dir="${task-controller.install.dir}" />
<exec executable="${make.cmd}" dir="${build.c++.task-controller}"
searchpath="yes" failonerror="yes">
<arg value="install" />
</exec>
</target>
<target name="test-task-controller" depends="task-controller">
<copy todir="${build.c++.task-controller}" verbose="true">
<fileset dir="${c++.task-controller.src}" includes="tests/"/>
</copy>
<exec executable="${make.cmd}" dir="${build.c++.task-controller}"
searchpath="yes" failonerror="yes">
<arg value="clean" />
<arg value="test" />
</exec>
<exec executable="${build.c++.task-controller}/tests/test-task-controller"
dir="${build.c++.task-controller}/tests/"
failonerror="yes">
</exec>
</target>
<!-- end of task-controller targets -->
<!-- Begining of fault-injection targets--> <!-- Begining of fault-injection targets-->
<import file="${test.src.dir}/aop/build/aop.xml"/> <import file="${test.src.dir}/aop/build/aop.xml"/>

View File

@ -1,42 +0,0 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# autom4te configuration for hadoop utils library
#
begin-language: "Autoheader-preselections"
args: --no-cache
end-language: "Autoheader-preselections"
begin-language: "Automake-preselections"
args: --no-cache
end-language: "Automake-preselections"
begin-language: "Autoreconf-preselections"
args: --no-cache
end-language: "Autoreconf-preselections"
begin-language: "Autoconf-without-aclocal-m4"
args: --no-cache
end-language: "Autoconf-without-aclocal-m4"
begin-language: "Autoconf"
args: --no-cache
end-language: "Autoconf"

View File

@ -1,33 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
ACLOCAL_AMFLAGS = -I ../utils/m4
AM_CFLAGS = -Wall
bindir = $(exec_prefix)
bin_PROGRAMS = task-controller
check_PROGRAMS = tests/test-task-controller
TESTS = $(check_PROGRAMS)
task_controller_SOURCES = main.c task-controller.c configuration.c \
task-controller.h
tests_test_task_controller_SOURCES = tests/test-task-controller.c \
task-controller.c configuration.c task-controller.h
test: $(check_PROGRAMS)
@echo Done with $<

View File

@ -1,245 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "configuration.h"
char * hadoop_conf_dir;
struct configuration config={.size=0, .confdetails=NULL};
//clean up method for freeing configuration
void free_configurations() {
int i = 0;
for (i = 0; i < config.size; i++) {
if (config.confdetails[i]->key != NULL) {
free((void *)config.confdetails[i]->key);
}
if (config.confdetails[i]->value != NULL) {
free((void *)config.confdetails[i]->value);
}
free(config.confdetails[i]);
}
if (config.size > 0) {
free(config.confdetails);
}
config.size = 0;
}
//function used to load the configurations present in the secure config
void get_configs() {
FILE *conf_file;
char *line;
char *equaltok;
char *temp_equaltok;
size_t linesize = 1000;
int size_read = 0;
int str_len = 0;
char *file_name = NULL;
#ifndef HADOOP_CONF_DIR
str_len = strlen(CONF_FILE_PATTERN) + strlen(hadoop_conf_dir);
file_name = (char *) malloc(sizeof(char) * (str_len + 1));
#else
str_len = strlen(CONF_FILE_PATTERN) + strlen(HADOOP_CONF_DIR);
file_name = (char *) malloc(sizeof(char) * (str_len + 1));
#endif
if (file_name == NULL) {
fprintf(LOGFILE, "Malloc failed :Out of memory \n");
return;
}
memset(file_name,'\0',str_len +1);
#ifndef HADOOP_CONF_DIR
snprintf(file_name,str_len, CONF_FILE_PATTERN, hadoop_conf_dir);
#else
snprintf(file_name, str_len, CONF_FILE_PATTERN, HADOOP_CONF_DIR);
#endif
#ifdef DEBUG
fprintf(LOGFILE, "get_configs :Conf file name is : %s \n", file_name);
#endif
//allocate space for ten configuration items.
config.confdetails = (struct confentry **) malloc(sizeof(struct confentry *)
* MAX_SIZE);
config.size = 0;
conf_file = fopen(file_name, "r");
if (conf_file == NULL) {
fprintf(LOGFILE, "Invalid conf file provided : %s \n", file_name);
free(file_name);
return;
}
while(!feof(conf_file)) {
line = (char *) malloc(linesize);
if(line == NULL) {
fprintf(LOGFILE, "malloc failed while reading configuration file.\n");
goto cleanup;
}
size_read = getline(&line,&linesize,conf_file);
//feof returns true only after we read past EOF.
//so a file with no new line, at last can reach this place
//if size_read returns negative check for eof condition
if (size_read == -1) {
if(!feof(conf_file)){
fprintf(LOGFILE, "getline returned error.\n");
goto cleanup;
}else {
break;
}
}
//trim the ending new line
line[strlen(line)-1] = '\0';
//comment line
if(line[0] == '#') {
free(line);
continue;
}
//tokenize first to get key and list of values.
//if no equals is found ignore this line, can be an empty line also
equaltok = strtok_r(line, "=", &temp_equaltok);
if(equaltok == NULL) {
free(line);
continue;
}
config.confdetails[config.size] = (struct confentry *) malloc(
sizeof(struct confentry));
if(config.confdetails[config.size] == NULL) {
fprintf(LOGFILE,
"Failed allocating memory for single configuration item\n");
goto cleanup;
}
#ifdef DEBUG
fprintf(LOGFILE, "get_configs : Adding conf key : %s \n", equaltok);
#endif
memset(config.confdetails[config.size], 0, sizeof(struct confentry));
config.confdetails[config.size]->key = (char *) malloc(
sizeof(char) * (strlen(equaltok)+1));
strcpy((char *)config.confdetails[config.size]->key, equaltok);
equaltok = strtok_r(NULL, "=", &temp_equaltok);
if (equaltok == NULL) {
fprintf(LOGFILE, "configuration tokenization failed \n");
goto cleanup;
}
//means value is commented so don't store the key
if(equaltok[0] == '#') {
free(line);
free((void *)config.confdetails[config.size]->key);
free(config.confdetails[config.size]);
continue;
}
#ifdef DEBUG
fprintf(LOGFILE, "get_configs : Adding conf value : %s \n", equaltok);
#endif
config.confdetails[config.size]->value = (char *) malloc(
sizeof(char) * (strlen(equaltok)+1));
strcpy((char *)config.confdetails[config.size]->value, equaltok);
if((config.size + 1) % MAX_SIZE == 0) {
config.confdetails = (struct confentry **) realloc(config.confdetails,
sizeof(struct confentry **) * (MAX_SIZE + config.size));
if (config.confdetails == NULL) {
fprintf(LOGFILE,
"Failed re-allocating memory for configuration items\n");
goto cleanup;
}
}
if(config.confdetails[config.size] )
config.size++;
free(line);
}
//close the file
fclose(conf_file);
//clean up allocated file name
free(file_name);
return;
//free spaces alloced.
cleanup:
if (line != NULL) {
free(line);
}
fclose(conf_file);
free(file_name);
free_configurations();
return;
}
/*
* function used to get a configuration value.
* The function for the first time populates the configuration details into
* array, next time onwards used the populated array.
*
*/
const char * get_value(const char* key) {
int count;
if (config.size == 0) {
get_configs();
}
if (config.size == 0) {
fprintf(LOGFILE, "Invalid configuration provided\n");
return NULL;
}
for (count = 0; count < config.size; count++) {
if (strcmp(config.confdetails[count]->key, key) == 0) {
return strdup(config.confdetails[count]->value);
}
}
return NULL;
}
/**
* Function to return an array of values for a key.
* Value delimiter is assumed to be a comma.
*/
const char ** get_values(const char * key) {
const char ** toPass = NULL;
const char *value = get_value(key);
char *tempTok = NULL;
char *tempstr = NULL;
int size = 0;
int len;
//first allocate any array of 10
if(value != NULL) {
toPass = (const char **) malloc(sizeof(char *) * MAX_SIZE);
tempTok = strtok_r((char *)value, ",", &tempstr);
if (tempTok != NULL) {
while (1) {
toPass[size++] = tempTok;
tempTok = strtok_r(NULL, ",", &tempstr);
if(tempTok == NULL){
break;
}
if((size % MAX_SIZE) == 0) {
toPass = (const char **) realloc(toPass,(sizeof(char *) *
(MAX_SIZE * ((size/MAX_SIZE) +1))));
}
}
} else {
toPass[size] = (char *)value;
}
}
if(size > 0) {
toPass[size] = NULL;
}
return toPass;
}

View File

@ -1,59 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#define INCREMENT_SIZE 1000
#define MAX_SIZE 10
struct confentry {
const char *key;
const char *value;
};
struct configuration {
int size;
struct confentry **confdetails;
};
FILE *LOGFILE;
#ifdef HADOOP_CONF_DIR
#define CONF_FILE_PATTERN "%s/taskcontroller.cfg"
#else
#define CONF_FILE_PATTERN "%s/conf/taskcontroller.cfg"
#endif
extern struct configuration config;
//configuration file contents
#ifndef HADOOP_CONF_DIR
extern char *hadoop_conf_dir;
#endif
//method exposed to get the configurations
const char * get_value(const char* key);
//method to free allocated configuration
void free_configurations();
//function to return array of values pointing to the key. Values are
//comma seperated strings.
const char ** get_values(const char* key);

View File

@ -1,68 +0,0 @@
# -*- Autoconf -*-
# Process this file with autoconf to produce a configure script.
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
AC_PREREQ(2.59)
AC_INIT([task-controller],[0.1])
#changing default prefix value to empty string, so that binary does not
#gets installed within system
AC_PREFIX_DEFAULT(.)
#add new argument called -with-confdir
AC_ARG_WITH(confdir,[--with-confdir path to hadoop conf dir])
AC_CONFIG_SRCDIR([task-controller.h])
AC_CONFIG_AUX_DIR([config])
AC_CONFIG_MACRO_DIR([../utils/m4])
AM_INIT_AUTOMAKE([subdir-objects foreign no-dist])
# Checks for programs.
AC_PROG_CC
# Checks for libraries.
# Checks for header files.
AC_HEADER_STDC
AC_CHECK_HEADERS([stdlib.h string.h unistd.h fcntl.h])
#check for HADOOP_CONF_DIR
if test "$with_confdir" != ""
then
AC_DEFINE_UNQUOTED(HADOOP_CONF_DIR, ["$with_confdir"], [Location of Hadoop configuration])
fi
# Checks for typedefs, structures, and compiler characteristics.
AC_C_CONST
AC_TYPE_PID_T
AC_TYPE_MODE_T
AC_TYPE_SIZE_T
# Checks for library functions.
AC_FUNC_MALLOC
AC_FUNC_REALLOC
AC_FUNC_CHOWN
AC_CHECK_FUNCS([strerror memset mkdir rmdir strdup])
AC_CONFIG_FILES([Makefile])
AC_OUTPUT
AC_HEADER_STDBOOL
AC_PROG_MAKE_SET

View File

@ -1,260 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "task-controller.h"
void open_log_file(const char *log_file) {
if (log_file == NULL) {
LOGFILE = stdout;
} else {
LOGFILE = fopen(log_file, "a");
if (LOGFILE == NULL) {
fprintf(stdout, "Unable to open LOGFILE : %s \n", log_file);
LOGFILE = stdout;
}
if (LOGFILE != stdout) {
if (chmod(log_file, S_IREAD | S_IEXEC | S_IWRITE | S_IROTH | S_IWOTH
| S_IRGRP | S_IWGRP) < 0) {
fprintf(stdout, "Unable to change permission of the log file %s \n",
log_file);
fclose(LOGFILE);
fprintf(stdout, "changing log file to stdout");
LOGFILE = stdout;
}
}
}
}
void display_usage(FILE *stream) {
fprintf(stream,
"Usage: task-controller [-l logfile] user command command-args\n");
}
/**
* Check the permissions on taskcontroller to make sure that security is
* promisable. For this, we need task-controller binary to
* * be user-owned by root
* * be group-owned by a configured special group.
* * others do not have write or execute permissions
* * be setuid
*/
int check_taskcontroller_permissions(char *executable_file) {
errno = 0;
char * resolved_path = (char *) canonicalize_file_name(executable_file);
if (resolved_path == NULL) {
fprintf(LOGFILE,
"Error resolving the canonical name for the executable : %s!",
strerror(errno));
return -1;
}
struct stat filestat;
errno = 0;
if (stat(resolved_path, &filestat) != 0) {
fprintf(LOGFILE, "Could not stat the executable : %s!.\n", strerror(errno));
return -1;
}
uid_t binary_euid = filestat.st_uid; // Binary's user owner
gid_t binary_egid = filestat.st_gid; // Binary's group owner
// Effective uid should be root
if (binary_euid != 0) {
fprintf(LOGFILE,
"The task-controller binary should be user-owned by root.\n");
return -1;
}
// Get the group entry for the special_group
errno = 0;
struct group *special_group_entry = getgrgid(binary_egid);
if (special_group_entry == NULL) {
fprintf(LOGFILE,
"Unable to get information for effective group of the binary : %s\n",
strerror(errno));
return -1;
}
char * binary_group = special_group_entry->gr_name;
// verify that the group name of the special group
// is same as the one in configuration
if (check_variable_against_config(TT_GROUP_KEY, binary_group) != 0) {
fprintf(LOGFILE,
"Group of the binary does not match with that in configuration\n");
return -1;
}
// check others do not have write/execute permissions
if ((filestat.st_mode & S_IWOTH) == S_IWOTH ||
(filestat.st_mode & S_IXOTH) == S_IXOTH) {
fprintf(LOGFILE,
"The task-controller binary should not have write or execute for others.\n");
return -1;
}
// Binary should be setuid executable
if ((filestat.st_mode & S_ISUID) != S_ISUID) {
fprintf(LOGFILE,
"The task-controller binary should be set setuid.\n");
return -1;
}
return 0;
}
int main(int argc, char **argv) {
int command;
int next_option = 0;
const char * job_id = NULL;
const char * task_id = NULL;
const char * tt_root = NULL;
const char *log_dir = NULL;
const char * unique_string = NULL;
int exit_code = 0;
const char * task_pid = NULL;
const char* const short_options = "l:";
const struct option long_options[] = { { "log", 1, NULL, 'l' }, { NULL, 0,
NULL, 0 } };
const char* log_file = NULL;
char * dir_to_be_deleted = NULL;
int conf_dir_len = 0;
char *executable_file = argv[0];
#ifndef HADOOP_CONF_DIR
conf_dir_len = (strlen(executable_file) - strlen(EXEC_PATTERN)) + 1;
if (conf_dir_len < 1) {
// We didn't get an absolute path to our executable_file; bail.
printf("Cannot find configuration directory.\n");
printf("This program must be run with its full absolute path.\n");
return INVALID_CONF_DIR;
} else {
hadoop_conf_dir = (char *) malloc (sizeof(char) * conf_dir_len);
strncpy(hadoop_conf_dir, executable_file,
(strlen(executable_file) - strlen(EXEC_PATTERN)));
hadoop_conf_dir[(strlen(executable_file) - strlen(EXEC_PATTERN))] = '\0';
}
#endif
do {
next_option = getopt_long(argc, argv, short_options, long_options, NULL);
switch (next_option) {
case 'l':
log_file = optarg;
default:
break;
}
} while (next_option != -1);
open_log_file(log_file);
if (check_taskcontroller_permissions(executable_file) != 0) {
fprintf(LOGFILE, "Invalid permissions on task-controller binary.\n");
return INVALID_TASKCONTROLLER_PERMISSIONS;
}
//Minimum number of arguments required to run the task-controller
//command-name user command tt-root
if (argc < 3) {
display_usage(stdout);
return INVALID_ARGUMENT_NUMBER;
}
//checks done for user name
//checks done if the user is root or not.
if (argv[optind] == NULL) {
fprintf(LOGFILE, "Invalid user name \n");
return INVALID_USER_NAME;
}
if (get_user_details(argv[optind]) != 0) {
return INVALID_USER_NAME;
}
//implicit conversion to int instead of __gid_t and __uid_t
if (user_detail->pw_gid == 0 || user_detail->pw_uid == 0) {
fprintf(LOGFILE, "Cannot run tasks as super user\n");
return SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS;
}
optind = optind + 1;
command = atoi(argv[optind++]);
fprintf(LOGFILE, "main : command provided %d\n",command);
fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
switch (command) {
case INITIALIZE_USER:
exit_code = initialize_user(user_detail->pw_name);
break;
case INITIALIZE_JOB:
job_id = argv[optind++];
exit_code = initialize_job(job_id, user_detail->pw_name);
break;
case INITIALIZE_DISTRIBUTEDCACHE_FILE:
tt_root = argv[optind++];
unique_string = argv[optind++];
exit_code = initialize_distributed_cache_file(tt_root, unique_string,
user_detail->pw_name);
break;
case LAUNCH_TASK_JVM:
tt_root = argv[optind++];
job_id = argv[optind++];
task_id = argv[optind++];
exit_code
= run_task_as_user(user_detail->pw_name, job_id, task_id, tt_root);
break;
case INITIALIZE_TASK:
job_id = argv[optind++];
task_id = argv[optind++];
exit_code = initialize_task(job_id, task_id, user_detail->pw_name);
break;
case TERMINATE_TASK_JVM:
task_pid = argv[optind++];
exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGTERM);
break;
case KILL_TASK_JVM:
task_pid = argv[optind++];
exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGKILL);
break;
case RUN_DEBUG_SCRIPT:
tt_root = argv[optind++];
job_id = argv[optind++];
task_id = argv[optind++];
exit_code
= run_debug_script_as_user(user_detail->pw_name, job_id, task_id, tt_root);
break;
case SIGQUIT_TASK_JVM:
task_pid = argv[optind++];
exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGQUIT);
break;
case ENABLE_TASK_FOR_CLEANUP:
tt_root = argv[optind++];
job_id = argv[optind++];
dir_to_be_deleted = argv[optind++];
exit_code = enable_task_for_cleanup(tt_root, user_detail->pw_name, job_id,
dir_to_be_deleted);
break;
case ENABLE_JOB_FOR_CLEANUP:
tt_root = argv[optind++];
job_id = argv[optind++];
exit_code = enable_job_for_cleanup(tt_root, user_detail->pw_name, job_id);
break;
default:
exit_code = INVALID_COMMAND_PROVIDED;
}
fflush(LOGFILE);
fclose(LOGFILE);
return exit_code;
}

View File

@ -1,148 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <pwd.h>
#include <assert.h>
#include <getopt.h>
#include <sys/stat.h>
#include <sys/signal.h>
#include <getopt.h>
#include <grp.h>
#include <dirent.h>
#include <fcntl.h>
#include <fts.h>
#include "configuration.h"
//command definitions
enum command {
INITIALIZE_USER,
INITIALIZE_JOB,
INITIALIZE_DISTRIBUTEDCACHE_FILE,
LAUNCH_TASK_JVM,
INITIALIZE_TASK,
TERMINATE_TASK_JVM,
KILL_TASK_JVM,
RUN_DEBUG_SCRIPT,
SIGQUIT_TASK_JVM,
ENABLE_TASK_FOR_CLEANUP,
ENABLE_JOB_FOR_CLEANUP
};
enum errorcodes {
INVALID_ARGUMENT_NUMBER = 1,
INVALID_USER_NAME, //2
INVALID_COMMAND_PROVIDED, //3
SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS, //4
INVALID_TT_ROOT, //5
SETUID_OPER_FAILED, //6
UNABLE_TO_EXECUTE_TASK_SCRIPT, //7
UNABLE_TO_KILL_TASK, //8
INVALID_TASK_PID, //9
ERROR_RESOLVING_FILE_PATH, //10
RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //11
UNABLE_TO_STAT_FILE, //12
FILE_NOT_OWNED_BY_TASKTRACKER, //13
PREPARE_ATTEMPT_DIRECTORIES_FAILED, //14
INITIALIZE_JOB_FAILED, //15
PREPARE_TASK_LOGS_FAILED, //16
INVALID_TT_LOG_DIR, //17
OUT_OF_MEMORY, //18
INITIALIZE_DISTCACHEFILE_FAILED, //19
INITIALIZE_USER_FAILED, //20
UNABLE_TO_EXECUTE_DEBUG_SCRIPT, //21
INVALID_CONF_DIR, //22
UNABLE_TO_BUILD_PATH, //23
INVALID_TASKCONTROLLER_PERMISSIONS, //24
PREPARE_JOB_LOGS_FAILED, //25
};
#define USER_DIR_PATTERN "%s/taskTracker/%s"
#define TT_JOB_DIR_PATTERN USER_DIR_PATTERN"/jobcache/%s"
#define USER_DISTRIBUTED_CACHE_DIR_PATTERN USER_DIR_PATTERN"/distcache/%s"
#define JOB_DIR_TO_JOB_WORK_PATTERN "%s/work"
#define JOB_DIR_TO_ATTEMPT_DIR_PATTERN "%s/%s"
#define JOB_LOG_DIR_PATTERN "%s/userlogs/%s"
#define JOB_LOG_DIR_TO_JOB_ACLS_FILE_PATTERN "%s/job-acls.xml"
#define ATTEMPT_LOG_DIR_PATTERN JOB_LOG_DIR_PATTERN"/%s"
#define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh"
#define TT_LOCAL_TASK_DIR_PATTERN "%s/taskTracker/%s/jobcache/%s/%s"
#define TT_SYS_DIR_KEY "mapreduce.cluster.local.dir"
#define TT_LOG_DIR_KEY "hadoop.log.dir"
#define TT_GROUP_KEY "mapreduce.tasktracker.group"
#ifndef HADOOP_CONF_DIR
#define EXEC_PATTERN "/bin/task-controller"
extern char * hadoop_conf_dir;
#endif
extern struct passwd *user_detail;
extern FILE *LOGFILE;
int run_task_as_user(const char * user, const char *jobid, const char *taskid,
const char *tt_root);
int run_debug_script_as_user(const char * user, const char *jobid, const char *taskid,
const char *tt_root);
int initialize_user(const char *user);
int initialize_task(const char *jobid, const char *taskid, const char *user);
int initialize_job(const char *jobid, const char *user);
int initialize_distributed_cache_file(const char *tt_root,
const char* unique_string, const char *user);
int kill_user_task(const char *user, const char *task_pid, int sig);
int enable_task_for_cleanup(const char *tt_root, const char *user,
const char *jobid, const char *dir_to_be_deleted);
int enable_job_for_cleanup(const char *tt_root, const char *user,
const char *jobid);
int prepare_attempt_directory(const char *attempt_dir, const char *user);
// The following functions are exposed for testing
int check_variable_against_config(const char *config_key,
const char *passed_value);
int get_user_details(const char *user);
char *get_task_launcher_file(const char *job_dir, const char *attempt_dir);

View File

@ -1,243 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "../task-controller.h"
#define HADOOP_CONF_DIR "/tmp"
int write_config_file(char *file_name) {
FILE *file;
char const *str =
"mapreduce.cluster.local.dir=/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4\n";
file = fopen(file_name, "w");
if (file == NULL) {
printf("Failed to open %s.\n", file_name);
return EXIT_FAILURE;
}
fwrite(str, 1, strlen(str), file);
fclose(file);
return 0;
}
void test_check_variable_against_config() {
// A temporary configuration directory
char *conf_dir_templ = "/tmp/test-task-controller-conf-dir-XXXXXX";
// To accomodate "/conf/taskcontroller.cfg"
char template[strlen(conf_dir_templ) + strlen("/conf/taskcontroller.cfg")];
strcpy(template, conf_dir_templ);
char *temp_dir = mkdtemp(template);
if (temp_dir == NULL) {
printf("Couldn't create a temporary dir for conf.\n");
goto cleanup;
}
// Set the configuration directory
hadoop_conf_dir = strdup(temp_dir);
// create the configuration directory
strcat(template, "/conf");
char *conf_dir = strdup(template);
mkdir(conf_dir, S_IRWXU);
// create the configuration file
strcat(template, "/taskcontroller.cfg");
if (write_config_file(template) != 0) {
printf("Couldn't write the configuration file.\n");
goto cleanup;
}
// Test obtaining a value for a key from the config
char *config_values[4] = { "/tmp/testing1", "/tmp/testing2",
"/tmp/testing3", "/tmp/testing4" };
char *value = (char *) get_value("mapreduce.cluster.local.dir");
if (strcmp(value, "/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4")
!= 0) {
printf("Obtaining a value for a key from the config failed.\n");
goto cleanup;
}
// Test the parsing of a multiple valued key from the config
char **values = (char **)get_values("mapreduce.cluster.local.dir");
char **values_ptr = values;
int i = 0;
while (*values_ptr != NULL) {
printf(" value : %s\n", *values_ptr);
if (strcmp(*values_ptr, config_values[i++]) != 0) {
printf("Configured values are not read out properly. Test failed!");
goto cleanup;;
}
values_ptr++;
}
if (check_variable_against_config("mapreduce.cluster.local.dir", "/tmp/testing5") == 0) {
printf("Configuration should not contain /tmp/testing5! \n");
goto cleanup;
}
if (check_variable_against_config("mapreduce.cluster.local.dir", "/tmp/testing4") != 0) {
printf("Configuration should contain /tmp/testing4! \n");
goto cleanup;
}
cleanup: if (value != NULL) {
free(value);
}
if (values != NULL) {
free(values);
}
if (hadoop_conf_dir != NULL) {
free(hadoop_conf_dir);
}
unlink(template);
rmdir(conf_dir);
rmdir(hadoop_conf_dir);
}
void test_get_user_directory() {
char *user_dir = (char *) get_user_directory("/tmp", "user");
printf("user_dir obtained is %s\n", user_dir);
int ret = 0;
if (strcmp(user_dir, "/tmp/taskTracker/user") != 0) {
ret = -1;
}
free(user_dir);
assert(ret == 0);
}
void test_get_job_directory() {
char *job_dir = (char *) get_job_directory("/tmp", "user",
"job_200906101234_0001");
printf("job_dir obtained is %s\n", job_dir);
int ret = 0;
if (strcmp(job_dir, "/tmp/taskTracker/user/jobcache/job_200906101234_0001")
!= 0) {
ret = -1;
}
free(job_dir);
assert(ret == 0);
}
void test_get_attempt_directory() {
char *job_dir = (char *) get_job_directory("/tmp", "user",
"job_200906101234_0001");
printf("job_dir obtained is %s\n", job_dir);
char *attempt_dir = (char *) get_attempt_directory(job_dir,
"attempt_200906101234_0001_m_000000_0");
printf("attempt_dir obtained is %s\n", attempt_dir);
int ret = 0;
if (strcmp(
attempt_dir,
"/tmp/taskTracker/user/jobcache/job_200906101234_0001/attempt_200906101234_0001_m_000000_0")
!= 0) {
ret = -1;
}
free(job_dir);
free(attempt_dir);
assert(ret == 0);
}
void test_get_task_launcher_file() {
char *job_dir = (char *) get_job_directory("/tmp", "user",
"job_200906101234_0001");
char *task_file = (char *) get_task_launcher_file(job_dir,
"attempt_200906112028_0001_m_000000_0");
printf("task_file obtained is %s\n", task_file);
int ret = 0;
if (strcmp(
task_file,
"/tmp/taskTracker/user/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0/taskjvm.sh")
!= 0) {
ret = -1;
}
free(task_file);
assert(ret == 0);
}
void test_get_job_log_dir() {
char *logdir = (char *) get_job_log_dir("/tmp/testing",
"job_200906101234_0001");
printf("logdir obtained is %s\n", logdir);
int ret = 0;
if (strcmp(logdir, "/tmp/testing/userlogs/job_200906101234_0001") != 0) {
ret = -1;
}
free(logdir);
assert(ret == 0);
}
void test_get_job_acls_file() {
char *job_acls_file = (char *) get_job_acls_file(
"/tmp/testing/userlogs/job_200906101234_0001");
printf("job acls file obtained is %s\n", job_acls_file);
int ret = 0;
if (strcmp(job_acls_file,
"/tmp/testing/userlogs/job_200906101234_0001/job-acls.xml") != 0) {
ret = -1;
}
free(job_acls_file);
assert(ret == 0);
}
void test_get_task_log_dir() {
char *logdir = (char *) get_task_log_dir("/tmp/testing",
"job_200906101234_0001", "attempt_200906112028_0001_m_000000_0");
printf("logdir obtained is %s\n", logdir);
int ret = 0;
if (strcmp(logdir,
"/tmp/testing/userlogs/job_200906101234_0001/attempt_200906112028_0001_m_000000_0")
!= 0) {
ret = -1;
}
free(logdir);
assert(ret == 0);
}
int main(int argc, char **argv) {
printf("\nStarting tests\n");
LOGFILE = stdout;
printf("\nTesting check_variable_against_config()\n");
test_check_variable_against_config();
printf("\nTesting get_user_directory()\n");
test_get_user_directory();
printf("\nTesting get_job_directory()\n");
test_get_job_directory();
printf("\nTesting get_attempt_directory()\n");
test_get_attempt_directory();
printf("\nTesting get_task_launcher_file()\n");
test_get_task_launcher_file();
printf("\nTesting get_job_log_dir()\n");
test_get_job_log_dir();
printf("\nTesting get_job_acls_file()\n");
test_get_job_acls_file();
printf("\nTesting get_task_log_dir()\n");
test_get_task_log_dir();
printf("\nFinished tests\n");
return 0;
}

View File

@ -1,657 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
/**
* A {@link TaskController} that runs the task JVMs as the user
* who submits the job.
*
* This class executes a setuid executable to implement methods
* of the {@link TaskController}, including launching the task
* JVM and killing it when needed, and also initializing and
* finalizing the task environment.
* <p> The setuid executable is launched using the command line:</p>
* <p>task-controller mapreduce.job.user.name command command-args, where</p>
* <p>mapreduce.job.user.name is the name of the owner who submits the job</p>
* <p>command is one of the cardinal value of the
* {@link LinuxTaskController.TaskControllerCommands} enumeration</p>
* <p>command-args depends on the command being launched.</p>
*
* In addition to running and killing tasks, the class also
* sets up appropriate access for the directories and files
* that will be used by the tasks.
*/
class LinuxTaskController extends TaskController {
private static final Log LOG =
LogFactory.getLog(LinuxTaskController.class);
// Name of the executable script that will contain the child
// JVM command line. See writeCommand for details.
private static final String COMMAND_FILE = "taskjvm.sh";
// Path to the setuid executable.
private static String taskControllerExe;
static {
// the task-controller is expected to be under the $HADOOP_PREFIX/bin
// directory.
File hadoopBin = new File(System.getenv("HADOOP_PREFIX"), "bin");
taskControllerExe =
new File(hadoopBin, "task-controller").getAbsolutePath();
}
public LinuxTaskController() {
super();
}
/**
* List of commands that the setuid script will execute.
*/
enum TaskControllerCommands {
INITIALIZE_USER,
INITIALIZE_JOB,
INITIALIZE_DISTRIBUTEDCACHE_FILE,
LAUNCH_TASK_JVM,
INITIALIZE_TASK,
TERMINATE_TASK_JVM,
KILL_TASK_JVM,
RUN_DEBUG_SCRIPT,
SIGQUIT_TASK_JVM,
ENABLE_TASK_FOR_CLEANUP,
ENABLE_JOB_FOR_CLEANUP
}
@Override
public void setup() throws IOException {
super.setup();
// Check the permissions of the task-controller binary by running it plainly.
// If permissions are correct, it returns an error code 1, else it returns
// 24 or something else if some other bugs are also present.
String[] taskControllerCmd =
new String[] { getTaskControllerExecutablePath() };
ShellCommandExecutor shExec = new ShellCommandExecutor(taskControllerCmd);
try {
shExec.execute();
} catch (ExitCodeException e) {
int exitCode = shExec.getExitCode();
if (exitCode != 1) {
LOG.warn("Exit code from checking binary permissions is : " + exitCode);
logOutput(shExec.getOutput());
throw new IOException("Task controller setup failed because of invalid"
+ "permissions/ownership with exit code " + exitCode, e);
}
}
}
/**
* Launch a task JVM that will run as the owner of the job.
*
* This method launches a task JVM by executing a setuid executable that will
* switch to the user and run the task. Also does initialization of the first
* task in the same setuid process launch.
*/
@Override
void launchTaskJVM(TaskController.TaskControllerContext context)
throws IOException {
JvmEnv env = context.env;
// get the JVM command line.
String cmdLine =
TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr,
env.logSize, true);
StringBuffer sb = new StringBuffer();
//export out all the environment variable before child command as
//the setuid/setgid binaries would not be getting, any environmental
//variables which begin with LD_*.
for(Entry<String, String> entry : env.env.entrySet()) {
sb.append("export ");
sb.append(entry.getKey());
sb.append("=");
sb.append(entry.getValue());
sb.append("\n");
}
sb.append(cmdLine);
// write the command to a file in the
// task specific cache directory
writeCommand(sb.toString(), getTaskCacheDirectory(context,
context.env.workDir));
// Call the taskcontroller with the right parameters.
List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context,
context.env.workDir);
ShellCommandExecutor shExec = buildTaskControllerExecutor(
TaskControllerCommands.LAUNCH_TASK_JVM,
env.conf.getUser(),
launchTaskJVMArgs, env.workDir, env.env);
context.shExec = shExec;
try {
shExec.execute();
} catch (Exception e) {
int exitCode = shExec.getExitCode();
LOG.warn("Exit code from task is : " + exitCode);
// 143 (SIGTERM) and 137 (SIGKILL) exit codes means the task was
// terminated/killed forcefully. In all other cases, log the
// task-controller output
if (exitCode != 143 && exitCode != 137) {
LOG.warn("Exception thrown while launching task JVM : "
+ StringUtils.stringifyException(e));
LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
logOutput(shExec.getOutput());
}
throw new IOException(e);
}
if (LOG.isDebugEnabled()) {
LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
logOutput(shExec.getOutput());
}
}
/**
* Launch the debug script process that will run as the owner of the job.
*
* This method launches the task debug script process by executing a setuid
* executable that will switch to the user and run the task.
*/
@Override
void runDebugScript(DebugScriptContext context) throws IOException {
String debugOut = FileUtil.makeShellPath(context.stdout);
String cmdLine = TaskLog.buildDebugScriptCommandLine(context.args, debugOut);
writeCommand(cmdLine, getTaskCacheDirectory(context, context.workDir));
// Call the taskcontroller with the right parameters.
List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context, context.workDir);
runCommand(TaskControllerCommands.RUN_DEBUG_SCRIPT, context.task.getUser(),
launchTaskJVMArgs, context.workDir, null);
}
/**
* Helper method that runs a LinuxTaskController command
*
* @param taskControllerCommand
* @param user
* @param cmdArgs
* @param env
* @throws IOException
*/
private void runCommand(TaskControllerCommands taskControllerCommand,
String user, List<String> cmdArgs, File workDir, Map<String, String> env)
throws IOException {
ShellCommandExecutor shExec =
buildTaskControllerExecutor(taskControllerCommand, user, cmdArgs,
workDir, env);
try {
shExec.execute();
} catch (Exception e) {
LOG.warn("Exit code from " + taskControllerCommand.toString() + " is : "
+ shExec.getExitCode());
LOG.warn("Exception thrown by " + taskControllerCommand.toString() + " : "
+ StringUtils.stringifyException(e));
LOG.info("Output from LinuxTaskController's "
+ taskControllerCommand.toString() + " follows:");
logOutput(shExec.getOutput());
throw new IOException(e);
}
if (LOG.isDebugEnabled()) {
LOG.info("Output from LinuxTaskController's "
+ taskControllerCommand.toString() + " follows:");
logOutput(shExec.getOutput());
}
}
/**
* Returns list of arguments to be passed while initializing a new task. See
* {@code buildTaskControllerExecutor(TaskControllerCommands, String,
* List<String>, JvmEnv)} documentation.
*
* @param context
* @return Argument to be used while launching Task VM
*/
private List<String> buildInitializeTaskArgs(TaskExecContext context) {
List<String> commandArgs = new ArrayList<String>(3);
String taskId = context.task.getTaskID().toString();
String jobId = getJobId(context);
commandArgs.add(jobId);
if (!context.task.isTaskCleanupTask()) {
commandArgs.add(taskId);
} else {
commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
}
return commandArgs;
}
@Override
void initializeTask(TaskControllerContext context)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Going to do "
+ TaskControllerCommands.INITIALIZE_TASK.toString()
+ " for " + context.task.getTaskID().toString());
}
runCommand(TaskControllerCommands.INITIALIZE_TASK,
context.env.conf.getUser(),
buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
}
/**
* Builds the args to be passed to task-controller for enabling of task for
* cleanup. Last arg in this List is either $attemptId or $attemptId/work
*/
private List<String> buildTaskCleanupArgs(
TaskControllerTaskPathDeletionContext context) {
List<String> commandArgs = new ArrayList<String>(3);
commandArgs.add(context.mapredLocalDir.toUri().getPath());
commandArgs.add(context.task.getJobID().toString());
String workDir = "";
if (context.isWorkDir) {
workDir = "/work";
}
if (context.task.isTaskCleanupTask()) {
commandArgs.add(context.task.getTaskID() + TaskTracker.TASK_CLEANUP_SUFFIX
+ workDir);
} else {
commandArgs.add(context.task.getTaskID() + workDir);
}
return commandArgs;
}
/**
* Builds the args to be passed to task-controller for enabling of job for
* cleanup. Last arg in this List is $jobid.
*/
private List<String> buildJobCleanupArgs(
TaskControllerJobPathDeletionContext context) {
List<String> commandArgs = new ArrayList<String>(2);
commandArgs.add(context.mapredLocalDir.toUri().getPath());
commandArgs.add(context.jobId.toString());
return commandArgs;
}
/**
* Enables the task for cleanup by changing permissions of the specified path
* in the local filesystem
*/
@Override
void enableTaskForCleanup(PathDeletionContext context)
throws IOException {
if (context instanceof TaskControllerTaskPathDeletionContext) {
TaskControllerTaskPathDeletionContext tContext =
(TaskControllerTaskPathDeletionContext) context;
enablePathForCleanup(tContext,
TaskControllerCommands.ENABLE_TASK_FOR_CLEANUP,
buildTaskCleanupArgs(tContext));
}
else {
throw new IllegalArgumentException("PathDeletionContext provided is not "
+ "TaskControllerTaskPathDeletionContext.");
}
}
/**
* Enables the job for cleanup by changing permissions of the specified path
* in the local filesystem
*/
@Override
void enableJobForCleanup(PathDeletionContext context)
throws IOException {
if (context instanceof TaskControllerJobPathDeletionContext) {
TaskControllerJobPathDeletionContext tContext =
(TaskControllerJobPathDeletionContext) context;
enablePathForCleanup(tContext,
TaskControllerCommands.ENABLE_JOB_FOR_CLEANUP,
buildJobCleanupArgs(tContext));
} else {
throw new IllegalArgumentException("PathDeletionContext provided is not "
+ "TaskControllerJobPathDeletionContext.");
}
}
/**
* Enable a path for cleanup
* @param c {@link TaskControllerPathDeletionContext} for the path to be
* cleaned up
* @param command {@link TaskControllerCommands} for task/job cleanup
* @param cleanupArgs arguments for the {@link LinuxTaskController} to enable
* path cleanup
*/
private void enablePathForCleanup(TaskControllerPathDeletionContext c,
TaskControllerCommands command,
List<String> cleanupArgs) {
if (LOG.isDebugEnabled()) {
LOG.debug("Going to do " + command.toString() + " for " + c.fullPath);
}
if ( c.user != null && c.fs instanceof LocalFileSystem) {
try {
runCommand(command, c.user, cleanupArgs, null, null);
} catch(IOException e) {
LOG.warn("Unable to change permissions for " + c.fullPath);
}
}
else {
throw new IllegalArgumentException("Either user is null or the "
+ "file system is not local file system.");
}
}
private void logOutput(String output) {
String shExecOutput = output;
if (shExecOutput != null) {
for (String str : shExecOutput.split("\n")) {
LOG.info(str);
}
}
}
private String getJobId(TaskExecContext context) {
String taskId = context.task.getTaskID().toString();
TaskAttemptID tId = TaskAttemptID.forName(taskId);
String jobId = tId.getJobID().toString();
return jobId;
}
/**
* Returns list of arguments to be passed while launching task VM.
* See {@code buildTaskControllerExecutor(TaskControllerCommands,
* String, List<String>, JvmEnv)} documentation.
* @param context
* @return Argument to be used while launching Task VM
*/
private List<String> buildLaunchTaskArgs(TaskExecContext context,
File workDir) {
List<String> commandArgs = new ArrayList<String>(3);
LOG.debug("getting the task directory as: "
+ getTaskCacheDirectory(context, workDir));
LOG.debug("getting the tt_root as " +getDirectoryChosenForTask(
new File(getTaskCacheDirectory(context, workDir)),
context) );
commandArgs.add(getDirectoryChosenForTask(
new File(getTaskCacheDirectory(context, workDir)),
context));
commandArgs.addAll(buildInitializeTaskArgs(context));
return commandArgs;
}
// Get the directory from the list of directories configured
// in Configs.LOCAL_DIR chosen for storing data pertaining to
// this task.
private String getDirectoryChosenForTask(File directory,
TaskExecContext context) {
String jobId = getJobId(context);
String taskId = context.task.getTaskID().toString();
for (String dir : mapredLocalDirs) {
File mapredDir = new File(dir);
File taskDir =
new File(mapredDir, TaskTracker.getTaskWorkDir(context.task
.getUser(), jobId, taskId, context.task.isTaskCleanupTask()))
.getParentFile();
if (directory.equals(taskDir)) {
return dir;
}
}
LOG.error("Couldn't parse task cache directory correctly");
throw new IllegalArgumentException("invalid task cache directory "
+ directory.getAbsolutePath());
}
/**
* Builds the command line for launching/terminating/killing task JVM.
* Following is the format for launching/terminating/killing task JVM
* <br/>
* For launching following is command line argument:
* <br/>
* {@code mapreduce.job.user.name command tt-root job_id task_id}
* <br/>
* For terminating/killing task jvm.
* {@code mapreduce.job.user.name command tt-root task-pid}
*
* @param command command to be executed.
* @param userName mapreduce.job.user.name
* @param cmdArgs list of extra arguments
* @param workDir working directory for the task-controller
* @param env JVM environment variables.
* @return {@link ShellCommandExecutor}
* @throws IOException
*/
private ShellCommandExecutor buildTaskControllerExecutor(
TaskControllerCommands command, String userName, List<String> cmdArgs,
File workDir, Map<String, String> env)
throws IOException {
String[] taskControllerCmd = new String[3 + cmdArgs.size()];
taskControllerCmd[0] = getTaskControllerExecutablePath();
taskControllerCmd[1] = userName;
taskControllerCmd[2] = String.valueOf(command.ordinal());
int i = 3;
for (String cmdArg : cmdArgs) {
taskControllerCmd[i++] = cmdArg;
}
if (LOG.isDebugEnabled()) {
for (String cmd : taskControllerCmd) {
LOG.debug("taskctrl command = " + cmd);
}
}
ShellCommandExecutor shExec = null;
if(workDir != null && workDir.exists()) {
shExec = new ShellCommandExecutor(taskControllerCmd,
workDir, env);
} else {
shExec = new ShellCommandExecutor(taskControllerCmd);
}
return shExec;
}
// Return the task specific directory under the cache.
private String getTaskCacheDirectory(TaskExecContext context,
File workDir) {
// In the case of JVM reuse, the task specific directory
// is different from what is set with respect with
// env.workDir. Hence building this from the taskId everytime.
String taskId = context.task.getTaskID().toString();
File cacheDirForJob = workDir.getParentFile().getParentFile();
if(context.task.isTaskCleanupTask()) {
taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX;
}
return new File(cacheDirForJob, taskId).getAbsolutePath();
}
// Write the JVM command line to a file under the specified directory
// Note that the JVM will be launched using a setuid executable, and
// could potentially contain strings defined by a user. Hence, to
// prevent special character attacks, we write the command line to
// a file and execute it.
private void writeCommand(String cmdLine,
String directory) throws IOException {
PrintWriter pw = null;
String commandFile = directory + File.separator + COMMAND_FILE;
LOG.info("Writing commands to " + commandFile);
LOG.info("--------Commands Begin--------");
LOG.info(cmdLine);
LOG.info("--------Commands End--------");
try {
FileWriter fw = new FileWriter(commandFile);
BufferedWriter bw = new BufferedWriter(fw);
pw = new PrintWriter(bw);
pw.write(cmdLine);
} catch (IOException ioe) {
LOG.error("Caught IOException while writing JVM command line to file. "
+ ioe.getMessage());
} finally {
if (pw != null) {
pw.close();
}
// set execute permissions for all on the file.
File f = new File(commandFile);
if (f.exists()) {
f.setReadable(true, false);
f.setExecutable(true, false);
}
}
}
private List<String> buildInitializeJobCommandArgs(
JobInitializationContext context) {
List<String> initJobCmdArgs = new ArrayList<String>();
initJobCmdArgs.add(context.jobid.toString());
return initJobCmdArgs;
}
@Override
void initializeJob(JobInitializationContext context)
throws IOException {
LOG.debug("Going to initialize job " + context.jobid.toString()
+ " on the TT");
runCommand(TaskControllerCommands.INITIALIZE_JOB, context.user,
buildInitializeJobCommandArgs(context), context.workDir, null);
}
@Override
public void initializeDistributedCacheFile(DistributedCacheFileContext context)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Going to initialize distributed cache for " + context.user
+ " with localizedBaseDir " + context.localizedBaseDir +
" and uniqueString " + context.uniqueString);
}
List<String> args = new ArrayList<String>();
// Here, uniqueString might start with '-'. Adding -- in front of the
// arguments indicates that they are non-option parameters.
args.add("--");
args.add(context.localizedBaseDir.toString());
args.add(context.uniqueString);
runCommand(TaskControllerCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE,
context.user, args, context.workDir, null);
}
@Override
public void initializeUser(InitializationContext context)
throws IOException {
LOG.debug("Going to initialize user directories for " + context.user
+ " on the TT");
runCommand(TaskControllerCommands.INITIALIZE_USER, context.user,
new ArrayList<String>(), context.workDir, null);
}
/**
* API which builds the command line to be pass to LinuxTaskController
* binary to terminate/kill the task. See
* {@code buildTaskControllerExecutor(TaskControllerCommands,
* String, List<String>, JvmEnv)} documentation.
*
*
* @param context context of task which has to be passed kill signal.
*
*/
private List<String> buildKillTaskCommandArgs(TaskControllerContext
context){
List<String> killTaskJVMArgs = new ArrayList<String>();
killTaskJVMArgs.add(context.pid);
return killTaskJVMArgs;
}
/**
* Convenience method used to sending appropriate signal to the task
* VM
* @param context
* @param command
* @throws IOException
*/
protected void signalTask(TaskControllerContext context,
TaskControllerCommands command) throws IOException{
if(context.task == null) {
LOG.info("Context task is null; not signaling the JVM");
return;
}
ShellCommandExecutor shExec = buildTaskControllerExecutor(
command, context.env.conf.getUser(),
buildKillTaskCommandArgs(context), context.env.workDir,
context.env.env);
try {
shExec.execute();
} catch (Exception e) {
LOG.warn("Output from task-contoller is : " + shExec.getOutput());
throw new IOException(e);
}
}
@Override
void terminateTask(TaskControllerContext context) {
try {
signalTask(context, TaskControllerCommands.TERMINATE_TASK_JVM);
} catch (Exception e) {
LOG.warn("Exception thrown while sending kill to the Task VM " +
StringUtils.stringifyException(e));
}
}
@Override
void killTask(TaskControllerContext context) {
try {
signalTask(context, TaskControllerCommands.KILL_TASK_JVM);
} catch (Exception e) {
LOG.warn("Exception thrown while sending destroy to the Task VM " +
StringUtils.stringifyException(e));
}
}
@Override
void dumpTaskStack(TaskControllerContext context) {
try {
signalTask(context, TaskControllerCommands.SIGQUIT_TASK_JVM);
} catch (Exception e) {
LOG.warn("Exception thrown while sending SIGQUIT to the Task VM " +
StringUtils.stringifyException(e));
}
}
protected String getTaskControllerExecutablePath() {
return taskControllerExe;
}
@Override
String getRunAsUser(JobConf conf) {
return conf.getUser();
}
}

View File

@ -1,511 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import junit.framework.TestCase;
/**
* The base class which starts up a cluster with LinuxTaskController as the task
* controller.
*
* In order to run test cases utilizing LinuxTaskController please follow the
* following steps:
* <ol>
* <li>Build LinuxTaskController by not passing any
* <code>-Dhadoop.conf.dir</code></li>
* <li>Change ownership of the built binary to root:group1, where group1 is
* a secondary group of the test runner.</li>
* <li>Change permissions on the binary so that <em>others</em> component does
* not have any permissions on binary</li>
* <li>Make the built binary to setuid and setgid executable</li>
* <li>Execute following targets:
* <code>ant test -Dcompile.c++=true -Dtaskcontroller-path=<em>path to built binary</em>
* -Dtaskcontroller-ugi=<em>user,group</em></code>
* <br/>(Note that "path to built binary" means the directory containing task-controller -
* not the actual complete path of the binary itself. This path must end in ".../bin")
* </li>
* </ol>
*
*/
public class ClusterWithLinuxTaskController extends TestCase {
private static final Log LOG =
LogFactory.getLog(ClusterWithLinuxTaskController.class);
/**
* The wrapper class around LinuxTaskController which allows modification of
* the custom path to task-controller which we can use for task management.
*
**/
public static class MyLinuxTaskController extends LinuxTaskController {
String taskControllerExePath = System.getProperty(TASKCONTROLLER_PATH)
+ "/task-controller";
@Override
public void setup() throws IOException {
getConf().set(TTConfig.TT_GROUP, taskTrackerSpecialGroup);
// write configuration file
configurationFile = createTaskControllerConf(System
.getProperty(TASKCONTROLLER_PATH), getConf());
super.setup();
}
@Override
protected String getTaskControllerExecutablePath() {
return new File(taskControllerExePath).getAbsolutePath();
}
void setTaskControllerExe(String execPath) {
this.taskControllerExePath = execPath;
}
volatile static int attemptedSigQuits = 0;
volatile static int failedSigQuits = 0;
/** Work like LinuxTaskController, but also count the number of
* attempted and failed SIGQUIT sends via the task-controller
* executable.
*/
@Override
void dumpTaskStack(TaskControllerContext context) {
attemptedSigQuits++;
try {
signalTask(context, TaskControllerCommands.SIGQUIT_TASK_JVM);
} catch (Exception e) {
LOG.warn("Execution sending SIGQUIT: " + StringUtils.stringifyException(e));
failedSigQuits++;
}
}
}
// cluster instances which sub classes can use
protected MiniMRCluster mrCluster = null;
protected MiniDFSCluster dfsCluster = null;
private JobConf clusterConf = null;
protected Path homeDirectory;
/** changing this to a larger number needs more work for creating
* taskcontroller.cfg.
* see {@link #startCluster()} and
* {@link #createTaskControllerConf(String, Configuration)}
*/
private static final int NUMBER_OF_NODES = 1;
static final String TASKCONTROLLER_PATH = "taskcontroller-path";
static final String TASKCONTROLLER_UGI = "taskcontroller-ugi";
private static File configurationFile = null;
protected UserGroupInformation jobOwner;
protected static String taskTrackerSpecialGroup = null;
/**
* Primary group of the tasktracker - i.e. the user running the
* test.
*/
protected static String taskTrackerPrimaryGroup = null;
static {
if (isTaskExecPathPassed()) {
try {
taskTrackerSpecialGroup = FileSystem.getLocal(new Configuration())
.getFileStatus(
new Path(System.getProperty(TASKCONTROLLER_PATH),
"task-controller")).getGroup();
} catch (IOException e) {
LOG.warn("Could not get group of the binary", e);
fail("Could not get group of the binary");
}
try {
taskTrackerPrimaryGroup =
UserGroupInformation.getCurrentUser().getGroupNames()[0];
} catch (IOException ioe) {
LOG.warn("Could not get primary group of the current user", ioe);
fail("Could not get primary group of the current user");
}
}
}
/*
* Utility method which subclasses use to start and configure the MR Cluster
* so they can directly submit a job.
*/
protected void startCluster()
throws IOException, InterruptedException {
JobConf conf = new JobConf();
dfsCluster = new MiniDFSCluster(conf, NUMBER_OF_NODES, true, null);
conf.set(TTConfig.TT_TASK_CONTROLLER,
MyLinuxTaskController.class.getName());
conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, false);
mrCluster =
new MiniMRCluster(NUMBER_OF_NODES, dfsCluster.getFileSystem().getUri()
.toString(), 4, null, null, conf);
clusterConf = mrCluster.createJobConf();
String ugi = System.getProperty(TASKCONTROLLER_UGI);
String[] splits = ugi.split(",");
jobOwner = UserGroupInformation.createUserForTesting(splits[0],
new String[]{splits[1]});
createHomeAndStagingDirectory(clusterConf);
}
private void createHomeAndStagingDirectory(JobConf conf)
throws IOException {
FileSystem fs = dfsCluster.getFileSystem();
String path = "/user/" + jobOwner.getUserName();
homeDirectory = new Path(path);
LOG.info("Creating Home directory : " + homeDirectory);
fs.mkdirs(homeDirectory);
changePermission(fs);
Path stagingArea = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT));
LOG.info("Creating Staging root directory : " + stagingArea);
fs.mkdirs(stagingArea);
fs.setPermission(stagingArea, new FsPermission((short)0777));
}
private void changePermission(FileSystem fs)
throws IOException {
fs.setOwner(homeDirectory, jobOwner.getUserName(),
jobOwner.getGroupNames()[0]);
}
static File getTaskControllerConfFile(String path) {
File confDirectory = new File(path, "../conf");
return new File(confDirectory, "taskcontroller.cfg");
}
/**
* Create taskcontroller.cfg.
*
* @param path Path to the taskcontroller binary.
* @param conf TaskTracker's configuration
* @return the created conf file
* @throws IOException
*/
static File createTaskControllerConf(String path,
Configuration conf) throws IOException {
File confDirectory = new File(path, "../conf");
if (!confDirectory.exists()) {
confDirectory.mkdirs();
}
File configurationFile = new File(confDirectory, "taskcontroller.cfg");
PrintWriter writer =
new PrintWriter(new FileOutputStream(configurationFile));
writer.println(String.format(MRConfig.LOCAL_DIR + "=%s", conf
.get(MRConfig.LOCAL_DIR)));
writer
.println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir()));
writer.println(String.format(TTConfig.TT_GROUP + "=%s",
conf.get(TTConfig.TT_GROUP)));
writer.flush();
writer.close();
return configurationFile;
}
/**
* Can we run the tests with LinuxTaskController?
*
* @return boolean
*/
protected static boolean shouldRun() {
if (!isTaskExecPathPassed() || !isUserPassed()) {
LOG.info("Not running test.");
return false;
}
return true;
}
static boolean isTaskExecPathPassed() {
String path = System.getProperty(TASKCONTROLLER_PATH);
if (path == null || path.isEmpty()
|| path.equals("${" + TASKCONTROLLER_PATH + "}")) {
LOG.info("Invalid taskcontroller-path : " + path);
return false;
}
return true;
}
private static boolean isUserPassed() {
String ugi = System.getProperty(TASKCONTROLLER_UGI);
if (ugi != null && !(ugi.equals("${" + TASKCONTROLLER_UGI + "}"))
&& !ugi.isEmpty()) {
if (ugi.indexOf(",") > 1) {
return true;
}
LOG.info("Invalid taskcontroller-ugi : " + ugi);
return false;
}
LOG.info("Invalid taskcontroller-ugi : " + ugi);
return false;
}
protected JobConf getClusterConf() {
return new JobConf(clusterConf);
}
@Override
protected void tearDown()
throws Exception {
if (mrCluster != null) {
mrCluster.shutdown();
}
if (dfsCluster != null) {
dfsCluster.shutdown();
}
if (configurationFile != null) {
configurationFile.delete();
}
super.tearDown();
}
/**
* Assert that the job is actually run by the specified user by verifying the
* permissions of the output part-files.
*
* @param outDir
* @throws IOException
*/
protected void assertOwnerShip(Path outDir)
throws IOException {
FileSystem fs = outDir.getFileSystem(clusterConf);
assertOwnerShip(outDir, fs);
}
/**
* Assert that the job is actually run by the specified user by verifying the
* permissions of the output part-files.
*
* @param outDir
* @param fs
* @throws IOException
*/
protected void assertOwnerShip(Path outDir, FileSystem fs)
throws IOException {
for (FileStatus status : fs.listStatus(outDir,
new Utils.OutputFileUtils
.OutputFilesFilter())) {
String owner = status.getOwner();
String group = status.getGroup();
LOG.info("Ownership of the file is " + status.getPath() + " is " + owner
+ "," + group);
assertTrue("Output part-file's owner is not correct. Expected : "
+ jobOwner.getUserName() + " Found : " + owner, owner
.equals(jobOwner.getUserName()));
assertTrue("Output part-file's group is not correct. Expected : "
+ jobOwner.getGroupNames()[0] + " Found : " + group, group
.equals(jobOwner.getGroupNames()[0]));
}
}
/**
* Validates permissions of private distcache dir and its contents fully
*/
public static void checkPermissionsOnPrivateDistCache(String[] localDirs,
String user, String taskTrackerUser, String groupOwner)
throws IOException {
// user-dir, jobcache and distcache will have
// 2770 permissions if jobOwner is same as tt_user
// 2570 permissions for any other user
String expectedDirPerms = taskTrackerUser.equals(user)
? "drwxrws---"
: "dr-xrws---";
String expectedFilePerms = taskTrackerUser.equals(user)
? "-rwxrwx---"
: "-r-xrwx---";
for (String localDir : localDirs) {
File distCacheDir = new File(localDir,
TaskTracker.getPrivateDistributedCacheDir(user));
if (distCacheDir.exists()) {
checkPermissionsOnDir(distCacheDir, user, groupOwner, expectedDirPerms,
expectedFilePerms);
}
}
}
/**
* Check that files expected to be localized in distributed cache for a user
* are present.
* @param localDirs List of mapred local directories.
* @param user User against which localization is happening
* @param expectedFileNames List of files expected to be localized
* @throws IOException
*/
public static void checkPresenceOfPrivateDistCacheFiles(String[] localDirs,
String user, String[] expectedFileNames) throws IOException {
FileGatherer gatherer = new FileGatherer();
for (String localDir : localDirs) {
File distCacheDir = new File(localDir,
TaskTracker.getPrivateDistributedCacheDir(user));
findExpectedFiles(expectedFileNames, distCacheDir, gatherer);
}
assertEquals("Files expected in private distributed cache were not found",
expectedFileNames.length, gatherer.getCount());
}
/**
* Validates permissions and ownership of public distcache dir and its
* contents fully in all local dirs
*/
public static void checkPermissionsOnPublicDistCache(FileSystem localFS,
String[] localDirs, String owner, String group) throws IOException {
for (String localDir : localDirs) {
File distCacheDir = new File(localDir,
TaskTracker.getPublicDistributedCacheDir());
if (distCacheDir.exists()) {
checkPublicFilePermissions(localFS, distCacheDir, owner, group);
}
}
}
/**
* Checks that files expected to be localized in the public distributed
* cache are present
* @param localDirs List of mapred local directories
* @param expectedFileNames List of expected file names.
* @throws IOException
*/
public static void checkPresenceOfPublicDistCacheFiles(String[] localDirs,
String[] expectedFileNames) throws IOException {
FileGatherer gatherer = new FileGatherer();
for (String localDir : localDirs) {
File distCacheDir = new File(localDir,
TaskTracker.getPublicDistributedCacheDir());
findExpectedFiles(expectedFileNames, distCacheDir, gatherer);
}
assertEquals("Files expected in public distributed cache were not found",
expectedFileNames.length, gatherer.getCount());
}
/**
* Validates permissions and ownership on the public distributed cache files
*/
private static void checkPublicFilePermissions(FileSystem localFS, File dir,
String owner, String group)
throws IOException {
Path dirPath = new Path(dir.getAbsolutePath());
TestTrackerDistributedCacheManager.checkPublicFilePermissions(localFS,
new Path[] {dirPath});
TestTrackerDistributedCacheManager.checkPublicFileOwnership(localFS,
new Path[] {dirPath}, owner, group);
if (dir.isDirectory()) {
File[] files = dir.listFiles();
for (File file : files) {
checkPublicFilePermissions(localFS, file, owner, group);
}
}
}
/**
* Validates permissions of given dir and its contents fully(i.e. recursively)
*/
private static void checkPermissionsOnDir(File dir, String user,
String groupOwner, String expectedDirPermissions,
String expectedFilePermissions) throws IOException {
TestTaskTrackerLocalization.checkFilePermissions(dir.toString(),
expectedDirPermissions, user, groupOwner);
File[] files = dir.listFiles();
for (File file : files) {
if (file.isDirectory()) {
checkPermissionsOnDir(file, user, groupOwner, expectedDirPermissions,
expectedFilePermissions);
} else {
TestTaskTrackerLocalization.checkFilePermissions(file.toString(),
expectedFilePermissions, user, groupOwner);
}
}
}
// Check which files among those expected are present in the rootDir
// Add those present to the FileGatherer.
private static void findExpectedFiles(String[] expectedFileNames,
File rootDir, FileGatherer gatherer) {
File[] files = rootDir.listFiles();
if (files == null) {
return;
}
for (File file : files) {
if (file.isDirectory()) {
findExpectedFiles(expectedFileNames, file, gatherer);
} else {
if (isFilePresent(expectedFileNames, file)) {
gatherer.addFileName(file.getName());
}
}
}
}
// Test if the passed file is present in the expected list of files.
private static boolean isFilePresent(String[] expectedFileNames, File file) {
boolean foundFileName = false;
for (String name : expectedFileNames) {
if (name.equals(file.getName())) {
foundFileName = true;
break;
}
}
return foundFileName;
}
// Helper class to collect a list of file names across multiple
// method calls. Wrapper around a collection defined for clarity
private static class FileGatherer {
List<String> foundFileNames = new ArrayList<String>();
void addFileName(String fileName) {
foundFileNames.add(fileName);
}
int getCount() {
return foundFileNames.size();
}
}
}

View File

@ -1,65 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
public class TestDebugScriptWithLinuxTaskController extends
ClusterWithLinuxTaskController {
@Test
public void testDebugScriptExecutionAsDifferentUser() throws Exception {
if (!super.shouldRun()) {
return;
}
super.startCluster();
TestDebugScript.setupDebugScriptDirs();
final Path inDir = new Path("input");
final Path outDir = new Path("output");
JobConf conf = super.getClusterConf();
FileSystem fs = inDir.getFileSystem(conf);
fs.mkdirs(inDir);
Path p = new Path(inDir, "1.txt");
fs.createNewFile(p);
String splits[] = System
.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI).
split(",");
JobID jobId = UserGroupInformation.createUserForTesting(splits[0],
new String[]{splits[1]}).doAs(new PrivilegedExceptionAction<JobID>() {
public JobID run() throws IOException{
return TestDebugScript.runFailingMapJob(
TestDebugScriptWithLinuxTaskController.this.getClusterConf(),
inDir, outDir);
}
});
// construct the task id of first map task of failmap
TaskAttemptID taskId = new TaskAttemptID(
new TaskID(jobId,TaskType.MAP, 0), 0);
TestDebugScript.verifyDebugScriptOutput(taskId, splits[0],
taskTrackerSpecialGroup, "-rw-rw----");
TestDebugScript.cleanupDebugScriptDirs();
}
}

View File

@ -1,140 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.util.ToolRunner;
/**
* Test a java-based mapred job with LinuxTaskController running the jobs as a
* user different from the user running the cluster. See
* {@link ClusterWithLinuxTaskController}
*/
public class TestJobExecutionAsDifferentUser extends
ClusterWithLinuxTaskController {
public void testJobExecution()
throws Exception {
if (!shouldRun()) {
return;
}
startCluster();
jobOwner.doAs(new PrivilegedExceptionAction<Object>() {
public Object run() throws Exception {
Path inDir = new Path("input");
Path outDir = new Path("output");
RunningJob job;
// Run a job with zero maps/reduces
job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 0, 0);
job.waitForCompletion();
assertTrue("Job failed", job.isSuccessful());
assertOwnerShip(outDir);
// Run a job with 1 map and zero reduces
job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 0);
job.waitForCompletion();
assertTrue("Job failed", job.isSuccessful());
assertOwnerShip(outDir);
// Run a normal job with maps/reduces
job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 1);
job.waitForCompletion();
assertTrue("Job failed", job.isSuccessful());
assertOwnerShip(outDir);
// Run a job with jvm reuse
JobConf myConf = getClusterConf();
myConf.set(JobContext.JVM_NUMTASKS_TORUN, "-1");
String[] args = { "-m", "6", "-r", "3", "-mt", "1000", "-rt", "1000" };
assertEquals(0, ToolRunner.run(myConf, new SleepJob(), args));
return null;
}
});
}
public void testEnvironment() throws Exception {
if (!shouldRun()) {
return;
}
startCluster();
jobOwner.doAs(new PrivilegedExceptionAction<Object>() {
public Object run() throws Exception {
TestMiniMRChildTask childTask = new TestMiniMRChildTask();
Path inDir = new Path("input1");
Path outDir = new Path("output1");
try {
childTask.runTestTaskEnv(getClusterConf(), inDir, outDir, false);
} catch (IOException e) {
fail("IOException thrown while running enviroment test."
+ e.getMessage());
} finally {
FileSystem outFs = outDir.getFileSystem(getClusterConf());
if (outFs.exists(outDir)) {
assertOwnerShip(outDir);
outFs.delete(outDir, true);
} else {
fail("Output directory does not exist" + outDir.toString());
}
return null;
}
}
});
}
/** Ensure that SIGQUIT can be properly sent by the LinuxTaskController
* if a task times out.
*/
public void testTimeoutStackTrace() throws Exception {
if (!shouldRun()) {
return;
}
// Run a job that should timeout and trigger a SIGQUIT.
startCluster();
jobOwner.doAs(new PrivilegedExceptionAction<Object>() {
public Object run() throws Exception {
JobConf conf = getClusterConf();
conf.setInt(JobContext.TASK_TIMEOUT, 10000);
conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(conf);
Job job = sleepJob.createJob(1, 0, 30000, 1, 0, 0);
job.setMaxMapAttempts(1);
int prevNumSigQuits = MyLinuxTaskController.attemptedSigQuits;
job.waitForCompletion(true);
assertTrue("Did not detect a new SIGQUIT!",
prevNumSigQuits < MyLinuxTaskController.attemptedSigQuits);
assertEquals("A SIGQUIT attempt failed!", 0,
MyLinuxTaskController.failedSigQuits);
return null;
}
});
}
}

View File

@ -1,49 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.security.PrivilegedExceptionAction;
/**
* Test killing of child processes spawned by the jobs with LinuxTaskController
* running the jobs as a user different from the user running the cluster.
* See {@link ClusterWithLinuxTaskController}
*/
public class TestKillSubProcessesWithLinuxTaskController extends
ClusterWithLinuxTaskController {
public void testKillSubProcess() throws Exception{
if(!shouldRun()) {
return;
}
startCluster();
jobOwner.doAs(new PrivilegedExceptionAction<Object>() {
public Object run() throws Exception {
JobConf myConf = getClusterConf();
JobTracker jt = mrCluster.getJobTrackerRunner().getJobTracker();
TestKillSubProcesses.mr = mrCluster;
TestKillSubProcesses sbProc = new TestKillSubProcesses();
sbProc.runTests(myConf, jt);
return null;
}
});
}
}

View File

@ -1,114 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import junit.framework.TestCase;
public class TestLinuxTaskController extends TestCase {
private static int INVALID_TASKCONTROLLER_PERMISSIONS = 24;
private static File testDir = new File(System.getProperty("test.build.data",
"/tmp"), TestLinuxTaskController.class.getName());
private static String taskControllerPath = System
.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
@Before
protected void setUp() throws Exception {
testDir.mkdirs();
}
@After
protected void tearDown() {
FileUtil.fullyDelete(testDir);
}
public static class MyLinuxTaskController extends LinuxTaskController {
String taskControllerExePath = taskControllerPath + "/task-controller";
@Override
protected String getTaskControllerExecutablePath() {
return taskControllerExePath;
}
}
private void validateTaskControllerSetup(TaskController controller,
boolean shouldFail) throws IOException {
if (shouldFail) {
// task controller setup should fail validating permissions.
Throwable th = null;
try {
controller.setup();
} catch (IOException ie) {
th = ie;
}
assertNotNull("No exception during setup", th);
assertTrue("Exception message does not contain exit code"
+ INVALID_TASKCONTROLLER_PERMISSIONS, th.getMessage().contains(
"with exit code " + INVALID_TASKCONTROLLER_PERMISSIONS));
} else {
controller.setup();
}
}
@Test
public void testTaskControllerGroup() throws Exception {
if (!ClusterWithLinuxTaskController.isTaskExecPathPassed()) {
return;
}
// cleanup configuration file.
ClusterWithLinuxTaskController
.getTaskControllerConfFile(taskControllerPath).delete();
Configuration conf = new Configuration();
// create local dirs and set in the conf.
File mapredLocal = new File(testDir, "mapred/local");
mapredLocal.mkdirs();
conf.set(MRConfig.LOCAL_DIR, mapredLocal.toString());
// setup task-controller without setting any group name
TaskController controller = new MyLinuxTaskController();
controller.setConf(conf);
validateTaskControllerSetup(controller, true);
// set an invalid group name for the task controller group
conf.set(TTConfig.TT_GROUP, "invalid");
// write the task-controller's conf file
ClusterWithLinuxTaskController.createTaskControllerConf(taskControllerPath,
conf);
validateTaskControllerSetup(controller, true);
conf.set(TTConfig.TT_GROUP,
ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
// write the task-controller's conf file
ClusterWithLinuxTaskController.createTaskControllerConf(taskControllerPath,
conf);
validateTaskControllerSetup(controller, false);
}
}

View File

@ -1,240 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.security.UserGroupInformation;
/**
* Test to verify localization of a job and localization of a task on a
* TaskTracker when {@link LinuxTaskController} is used.
*
*/
public class TestLocalizationWithLinuxTaskController extends
TestTaskTrackerLocalization {
private static final Log LOG =
LogFactory.getLog(TestLocalizationWithLinuxTaskController.class);
private File configFile;
private static String taskTrackerUserName;
@Override
protected boolean canRun() {
return ClusterWithLinuxTaskController.shouldRun();
}
@Override
protected void setUp()
throws Exception {
if (!canRun()) {
return;
}
super.setUp();
taskTrackerUserName = UserGroupInformation.getLoginUser()
.getShortUserName();
}
@Override
protected void tearDown()
throws Exception {
if (!canRun()) {
return;
}
super.tearDown();
if (configFile != null) {
configFile.delete();
}
}
protected TaskController createTaskController() {
return new MyLinuxTaskController();
}
protected UserGroupInformation getJobOwner() {
String ugi = System
.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
String[] splits = ugi.split(",");
return UserGroupInformation.createUserForTesting(splits[0],
new String[] { splits[1] });
}
/** @InheritDoc */
@Override
public void testTaskControllerSetup() {
// Do nothing.
}
@Override
protected void checkUserLocalization()
throws IOException {
// Check the directory structure and permissions
for (String dir : localDirs) {
File localDir = new File(dir);
assertTrue(MRConfig.LOCAL_DIR + localDir + " isn'task created!",
localDir.exists());
File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
assertTrue("taskTracker sub-dir in the local-dir " + localDir
+ "is not created!", taskTrackerSubDir.exists());
// user-dir, jobcache and distcache will have
// 2770 permissions if jobOwner is same as tt_user
// 2570 permissions for any other user
String expectedDirPerms = taskTrackerUserName.equals(task.getUser())
? "drwxrws---"
: "dr-xrws---";
File userDir = new File(taskTrackerSubDir, task.getUser());
assertTrue("user-dir in taskTrackerSubdir " + taskTrackerSubDir
+ "is not created!", userDir.exists());
checkFilePermissions(userDir.getAbsolutePath(), expectedDirPerms, task
.getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
File jobCache = new File(userDir, TaskTracker.JOBCACHE);
assertTrue("jobcache in the userDir " + userDir + " isn't created!",
jobCache.exists());
checkFilePermissions(jobCache.getAbsolutePath(), expectedDirPerms, task
.getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
// Verify the distributed cache dir.
File distributedCacheDir =
new File(localDir, TaskTracker
.getPrivateDistributedCacheDir(task.getUser()));
assertTrue("distributed cache dir " + distributedCacheDir
+ " doesn't exists!", distributedCacheDir.exists());
checkFilePermissions(distributedCacheDir.getAbsolutePath(),
expectedDirPerms, task.getUser(),
ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
}
}
@Override
protected void checkJobLocalization()
throws IOException {
// job-dir, jars-dir and subdirectories in them will have
// 2770 permissions if jobOwner is same as tt_user
// 2570 permissions for any other user
// Files under these dirs will have
// 770 permissions if jobOwner is same as tt_user
// 570 permissions for any other user
String expectedDirPerms = taskTrackerUserName.equals(task.getUser())
? "drwxrws---"
: "dr-xrws---";
String expectedFilePerms = taskTrackerUserName.equals(task.getUser())
? "-rwxrwx---"
: "-r-xrwx---";
for (String localDir : trackerFConf.getStrings(MRConfig.LOCAL_DIR)) {
File jobDir =
new File(localDir, TaskTracker.getLocalJobDir(task.getUser(), jobId
.toString()));
// check the private permissions on the job directory
checkFilePermissions(jobDir.getAbsolutePath(), expectedDirPerms, task
.getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
}
// check the private permissions of various directories
List<Path> dirs = new ArrayList<Path>();
Path jarsDir =
lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarsDir(task.getUser(),
jobId.toString()), trackerFConf);
dirs.add(jarsDir);
dirs.add(new Path(jarsDir, "lib"));
for (Path dir : dirs) {
checkFilePermissions(dir.toUri().getPath(), expectedDirPerms,
task.getUser(),
ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
}
// job-work dir needs user writable permissions i.e. 2770 for any user
Path jobWorkDir =
lDirAlloc.getLocalPathToRead(TaskTracker.getJobWorkDir(task.getUser(),
jobId.toString()), trackerFConf);
checkFilePermissions(jobWorkDir.toUri().getPath(), "drwxrws---", task
.getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
// check the private permissions of various files
List<Path> files = new ArrayList<Path>();
files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalJobConfFile(
task.getUser(), jobId.toString()), trackerFConf));
files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(task
.getUser(), jobId.toString()), trackerFConf));
files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib1.jar"));
files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib2.jar"));
for (Path file : files) {
checkFilePermissions(file.toUri().getPath(), expectedFilePerms, task
.getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
}
// check job user-log directory permissions
File jobLogDir = TaskLog.getJobDir(jobId);
checkFilePermissions(jobLogDir.toString(), expectedDirPerms, task.getUser(),
ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
// check job-acls.xml file permissions
checkFilePermissions(jobLogDir.toString() + Path.SEPARATOR
+ TaskTracker.jobACLsFile, expectedFilePerms, task.getUser(),
ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
// validate the content of job ACLs file
validateJobACLsFileContent();
}
@Override
protected void checkTaskLocalization()
throws IOException {
// check the private permissions of various directories
List<Path> dirs = new ArrayList<Path>();
dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(task
.getUser(), jobId.toString(), taskId.toString(),
task.isTaskCleanupTask()), trackerFConf));
dirs.add(attemptWorkDir);
dirs.add(new Path(attemptWorkDir, "tmp"));
dirs.add(new Path(attemptLogFiles[1].getParentFile().getAbsolutePath()));
for (Path dir : dirs) {
checkFilePermissions(dir.toUri().getPath(), "drwxrws---",
task.getUser(),
ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
}
// check the private permissions of various files
List<Path> files = new ArrayList<Path>();
files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
.getUser(), task.getJobID().toString(), task.getTaskID().toString(),
task.isTaskCleanupTask()), trackerFConf));
for (Path file : files) {
checkFilePermissions(file.toUri().getPath(), "-rwxrwx---", task
.getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
}
}
}

View File

@ -1,159 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager;
import org.apache.hadoop.security.UserGroupInformation;
/**
* Test the DistributedCacheManager when LinuxTaskController is used.
*
*/
public class TestTrackerDistributedCacheManagerWithLinuxTaskController extends
TestTrackerDistributedCacheManager {
private File configFile;
private static final Log LOG =
LogFactory
.getLog(TestTrackerDistributedCacheManagerWithLinuxTaskController.class);
@Override
protected void setUp()
throws IOException, InterruptedException {
if (!ClusterWithLinuxTaskController.shouldRun()) {
return;
}
TEST_ROOT_DIR =
new File(System.getProperty("test.build.data", "/tmp"),
TestTrackerDistributedCacheManagerWithLinuxTaskController.class
.getSimpleName()).getAbsolutePath();
super.setUp();
taskController = new MyLinuxTaskController();
String path =
System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
String execPath = path + "/task-controller";
((MyLinuxTaskController)taskController).setTaskControllerExe(execPath);
taskController.setConf(conf);
taskController.setup();
}
@Override
protected void refreshConf(Configuration conf) throws IOException {
super.refreshConf(conf);
String path =
System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
configFile =
ClusterWithLinuxTaskController.createTaskControllerConf(path, conf);
}
@Override
protected void tearDown()
throws IOException {
if (!ClusterWithLinuxTaskController.shouldRun()) {
return;
}
if (configFile != null) {
configFile.delete();
}
super.tearDown();
}
@Override
protected boolean canRun() {
return ClusterWithLinuxTaskController.shouldRun();
}
@Override
protected String getJobOwnerName() {
String ugi =
System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
String userName = ugi.split(",")[0];
return userName;
}
@Override
protected void checkFilePermissions(Path[] localCacheFiles)
throws IOException {
String userName = getJobOwnerName();
String filePermissions = UserGroupInformation.getLoginUser()
.getShortUserName().equals(userName) ? "-rwxrwx---" : "-r-xrwx---";
for (Path p : localCacheFiles) {
// First make sure that the cache file has proper permissions.
TestTaskTrackerLocalization.checkFilePermissions(p.toUri().getPath(),
filePermissions, userName,
ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
// Now. make sure that all the path components also have proper
// permissions.
checkPermissionOnPathComponents(p.toUri().getPath(), userName);
}
}
/**
* @param cachedFilePath
* @param userName
* @throws IOException
*/
private void checkPermissionOnPathComponents(String cachedFilePath,
String userName)
throws IOException {
// The trailing distcache/file/... string
String trailingStringForFirstFile =
cachedFilePath.replaceFirst(ROOT_MAPRED_LOCAL_DIR.getAbsolutePath()
+ Path.SEPARATOR + "0_[0-" + (numLocalDirs - 1) + "]"
+ Path.SEPARATOR + TaskTracker.getPrivateDistributedCacheDir(userName),
"");
LOG.info("Trailing path for cacheFirstFile is : "
+ trailingStringForFirstFile);
// The leading mapreduce.cluster.local.dir/0_[0-n]/taskTracker/$user string.
String leadingStringForFirstFile =
cachedFilePath.substring(0, cachedFilePath
.lastIndexOf(trailingStringForFirstFile));
LOG.info("Leading path for cacheFirstFile is : "
+ leadingStringForFirstFile);
String dirPermissions = UserGroupInformation.getLoginUser()
.getShortUserName().equals(userName) ? "drwxrws---" : "dr-xrws---";
// Now check path permissions, starting with cache file's parent dir.
File path = new File(cachedFilePath).getParentFile();
while (!path.getAbsolutePath().equals(leadingStringForFirstFile)) {
TestTaskTrackerLocalization.checkFilePermissions(path.getAbsolutePath(),
dirPermissions, userName,
ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
path = path.getParentFile();
}
}
}

View File

@ -1,82 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.pipes;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterWithLinuxTaskController;
import org.apache.hadoop.mapred.JobConf;
/**
* Test Pipes jobs with LinuxTaskController running the jobs as a user different
* from the user running the cluster. See {@link ClusterWithLinuxTaskController}
*/
public class TestPipesAsDifferentUser extends ClusterWithLinuxTaskController {
private static final Log LOG =
LogFactory.getLog(TestPipesAsDifferentUser.class);
public void testPipes()
throws Exception {
if (System.getProperty("compile.c++") == null) {
LOG.info("compile.c++ is not defined, so skipping TestPipes");
return;
}
if (!shouldRun()) {
return;
}
super.startCluster();
jobOwner.doAs(new PrivilegedExceptionAction<Object>() {
public Object run() throws Exception {
JobConf clusterConf = getClusterConf();
Path inputPath = new Path(homeDirectory, "in");
Path outputPath = new Path(homeDirectory, "out");
TestPipes.writeInputFile(FileSystem.get(clusterConf), inputPath);
TestPipes.runProgram(mrCluster, dfsCluster, TestPipes.wordCountSimple,
inputPath, outputPath, 3, 2, TestPipes.twoSplitOutput, clusterConf);
assertOwnerShip(outputPath);
TestPipes.cleanup(dfsCluster.getFileSystem(), outputPath);
TestPipes.runProgram(mrCluster, dfsCluster, TestPipes.wordCountSimple,
inputPath, outputPath, 3, 0, TestPipes.noSortOutput, clusterConf);
assertOwnerShip(outputPath);
TestPipes.cleanup(dfsCluster.getFileSystem(), outputPath);
TestPipes.runProgram(mrCluster, dfsCluster, TestPipes.wordCountPart,
inputPath, outputPath, 3, 2, TestPipes.fixedPartitionOutput,
clusterConf);
assertOwnerShip(outputPath);
TestPipes.cleanup(dfsCluster.getFileSystem(), outputPath);
TestPipes.runNonPipedProgram(mrCluster, dfsCluster,
TestPipes.wordCountNoPipes, clusterConf);
assertOwnerShip(TestPipes.nonPipedOutDir, FileSystem
.getLocal(clusterConf));
return null;
}
});
}
}