MAPREDUCE-2988. svn merge -c r1180833 --ignore-ancestry ../../trunk/

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1180835 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-10 09:20:49 +00:00
parent 590ff92e7a
commit 446be91d0d
13 changed files with 476 additions and 247 deletions

View File

@ -286,6 +286,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2726. Added job-file to the AM and JobHistoryServer web
interfaces. (Jeffrey Naisbitt via vinodkv)
MAPREDUCE-2880. Improve classpath-construction for mapreduce AM and
containers. (Arun C Murthy via vinodkv)
MAPREDUCE-3055. Simplified ApplicationAttemptId passing to
ApplicationMaster via environment variable. (vinodkv)
@ -316,9 +319,12 @@ Release 0.23.0 - Unreleased
the outputs of tasks from a crashed job so as to support MR Application
Master recovery. (Sharad Agarwal and Arun C Murthy via vinodkv)
MAPREDUCE-2738. Added the missing cluster level statisticss on the RM web
MAPREDUCE-2738. Added the missing cluster level statistics on the RM web
UI. (Robert Joseph Evans via vinodkv)
MAPREDUCE-2988. Reenabled TestLinuxContainerExecutor reflecting the
current NodeManager code. (Robert Joseph Evans via vinodkv)
OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
@ -330,9 +336,6 @@ Release 0.23.0 - Unreleased
MAPREDUCE-901. Efficient framework counters. (llu via acmurthy)
MAPREDUCE-2880. Improve classpath-construction for mapreduce AM and
containers. (Arun C Murthy via vinodkv)
BUG FIXES
MAPREDUCE-2603. Disable High-Ram emulation in system tests.

View File

@ -26,6 +26,7 @@
<properties>
<install.file>${project.artifact.file}</install.file>
<yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
<container-executor.conf.dir>/etc/hadoop</container-executor.conf.dir>
</properties>
<dependencies>
@ -46,21 +47,23 @@
<version>1.0-beta-1</version>
<executions>
<execution>
<id>autoreconf</id>
<phase>package</phase>
<id>clean</id>
<phase>clean</phase>
<configuration>
<arguments>
<argument>-i</argument>
</arguments>
<workDir>src/main/c/container-executor</workDir>
<destDir>target</destDir>
</configuration>
<goals>
<goal>autoreconf</goal>
<goal>make-clean</goal>
</goals>
</execution>
<execution>
<id>make</id>
<phase>package</phase>
<phase>compile</phase>
<configuration>
<workDir>src/main/c/container-executor</workDir>
<configureEnvironment>
@ -79,20 +82,34 @@
<prefix>${project.build.outputDirectory}</prefix>
</configuration>
<goals>
<!-- always clean, to ensure conf dir regenerated -->
<goal>make-clean</goal>
<goal>autoreconf</goal>
<goal>configure</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>install</id>
<phase>package</phase>
<id>make-test</id>
<phase>test</phase>
<configuration>
<destDir>/</destDir>
<workDir>src/main/c/container-executor</workDir>
<configureEnvironment>
<property>
<name>CFLAGS</name>
<value>-DHADOOP_CONF_DIR=${container-executor.conf.dir}</value>
</property>
</configureEnvironment>
<sources>
<source>
<directory>src/main/c/container-executor</directory>
</source>
</sources>
<workDir>src/main/c/container-executor</workDir>
<destDir>target</destDir>
<prefix>${project.build.outputDirectory}</prefix>
<compileTarget>check</compileTarget>
</configuration>
<goals>
<goal>make-install</goal>
<goal>compile</goal>
</goals>
</execution>
</executions>
@ -145,8 +162,12 @@
<configuration>
<systemPropertyVariables>
<property>
<name>container-executor-path</name>
<value></value>
<name>container-executor.path</name>
<value>${container-executor.path}</value>
</property>
<property>
<name>application.submitter</name>
<value>${application.submitter}</value>
</property>
</systemPropertyVariables>
<excludes>

View File

@ -16,3 +16,6 @@ libtool
missing
container-executor
test/.deps/
test-task-controller
test/.dirstamp
test/test-task-controller.o

View File

@ -18,9 +18,9 @@ AM_CFLAGS=-I$(srcdir)/impl -Wall -g -Werror
# Define the programs that need to be built
bin_PROGRAMS = container-executor
#check_PROGRAMS = test-task-controller
check_PROGRAMS = test-task-controller
#TESTS = test-task-controller
TESTS = test-task-controller
# Define the sources for the common files
common_SOURCES = impl/configuration.c impl/container-executor.c
@ -29,4 +29,4 @@ common_SOURCES = impl/configuration.c impl/container-executor.c
container_executor_SOURCES = $(common_SOURCES) impl/main.c
# Define the sources for the test executable
#test_task_controller_SOURCES = $(common_SOURCES) test/test-task-controller.c
test_task_controller_SOURCES = $(common_SOURCES) test/test-task-controller.c

View File

@ -18,7 +18,7 @@
# Process this file with autoconf to produce a configure script.
AC_PREREQ(2.59)
AC_INIT(linux-container-executor, 1.0.0, yarn-dev@yahoo-inc.com)
AC_INIT(linux-container-executor, 1.0.0, mapreduce-dev@hadoop.apache.org)
AC_GNU_SOURCE
#AC_SYS_LARGEFILE

View File

@ -425,6 +425,7 @@ static struct passwd* get_user_info(const char* user) {
struct passwd* check_user(const char *user) {
if (strcmp(user, "root") == 0) {
fprintf(LOGFILE, "Running as root is not allowed\n");
fflush(LOGFILE);
return NULL;
}
char *min_uid_str = get_value(MIN_USERID_KEY);
@ -435,6 +436,7 @@ struct passwd* check_user(const char *user) {
if (min_uid_str == end_ptr || *end_ptr != '\0') {
fprintf(LOGFILE, "Illegal value of %s for %s in configuration\n",
min_uid_str, MIN_USERID_KEY);
fflush(LOGFILE);
free(min_uid_str);
return NULL;
}
@ -443,11 +445,13 @@ struct passwd* check_user(const char *user) {
struct passwd *user_info = get_user_info(user);
if (NULL == user_info) {
fprintf(LOGFILE, "User %s not found\n", user);
fflush(LOGFILE);
return NULL;
}
if (user_info->pw_uid < min_uid) {
fprintf(LOGFILE, "Requested user %s has id %d, which is below the "
"minimum allowed %d\n", user, user_info->pw_uid, min_uid);
fflush(LOGFILE);
free(user_info);
return NULL;
}
@ -516,8 +520,13 @@ int create_directory_for_user(const char* path) {
mode_t permissions = S_IRWXU | S_IRGRP | S_IXGRP | S_ISGID;
uid_t user = geteuid();
gid_t group = getegid();
uid_t root = 0;
int ret = 0;
ret = change_effective_user(0, tt_gid);
if(getuid() == root) {
ret = change_effective_user(root, tt_gid);
}
if (ret == 0) {
if (0 == mkdir(path, permissions) || EEXIST == errno) {
// need to reassert the group sticky bit
@ -537,6 +546,8 @@ int create_directory_for_user(const char* path) {
}
}
if (change_effective_user(user, group) != 0) {
fprintf(LOGFILE, "Failed to change user to %i - %i\n", user, group);
ret = -1;
}
return ret;

View File

@ -68,7 +68,7 @@ enum errorcodes {
extern struct passwd *user_detail;
// the log file for error messages
// the log file for messages
extern FILE *LOGFILE;
// get the executable's filename
@ -76,11 +76,6 @@ char* get_executable();
int check_taskcontroller_permissions(char *executable_file);
/**
* delete a given log directory as a user
*/
int delete_log_directory(const char *log_dir);
// initialize the job directory
int initialize_job(const char *user, const char *jobid,
const char *credentials, char* const* args);
@ -137,9 +132,6 @@ char *get_task_credentials_file(const char* work_dir);
*/
char* get_job_log_directory(const char* log_root, const char* jobid);
char *get_task_log_dir(const char *log_dir, const char *job_id,
const char *attempt_id);
/**
* Ensure that the given path and all of the parent directories are created
* with the desired permissions.

View File

@ -77,6 +77,7 @@ int main(int argc, char **argv) {
if (conf_file == NULL) {
fprintf(LOGFILE, "Configuration file %s not found.\n", orig_conf_file);
fflush(LOGFILE);
return INVALID_CONFIG_FILE;
}
if (check_configuration_permissions(conf_file) != 0) {
@ -89,12 +90,14 @@ int main(int argc, char **argv) {
char *tt_group = get_value(TT_GROUP_KEY);
if (tt_group == NULL) {
fprintf(LOGFILE, "Can't get configured value for %s.\n", TT_GROUP_KEY);
fflush(LOGFILE);
exit(INVALID_CONFIG_FILE);
}
struct group *group_info = getgrnam(tt_group);
if (group_info == NULL) {
fprintf(LOGFILE, "Can't get group information for %s - %s.\n", tt_group,
strerror(errno));
fflush(LOGFILE);
exit(INVALID_CONFIG_FILE);
}
set_tasktracker_uid(getuid(), group_info->gr_gid);
@ -105,12 +108,14 @@ int main(int argc, char **argv) {
if (check_taskcontroller_permissions(executable_file) != 0) {
fprintf(LOGFILE, "Invalid permissions on container-executor binary.\n");
fflush(LOGFILE);
return INVALID_TASKCONTROLLER_PERMISSIONS;
}
//checks done for user name
if (argv[optind] == NULL) {
fprintf(LOGFILE, "Invalid user name \n");
fflush(LOGFILE);
return INVALID_USER_NAME;
}
int ret = set_user(argv[optind]);

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
#include "configuration.h"
#include "task-controller.h"
#include "container-executor.h"
#include <errno.h>
#include <fcntl.h>
@ -95,6 +95,12 @@ int write_config_file(char *file_name) {
fprintf(file, "," TEST_ROOT "/local-%d", i);
}
fprintf(file, "\n");
fprintf(file, "mapreduce.cluster.local.dir=" TEST_ROOT "/local-1");
for(i=2; i < 5; ++i) {
fprintf(file, "," TEST_ROOT "/local-%d", i);
}
fprintf(file, "\n");
fprintf(file, "hadoop.log.dir=" TEST_ROOT "/logs\n");
fclose(file);
return 0;
@ -110,7 +116,7 @@ void create_tt_roots() {
exit(1);
}
char buffer[100000];
sprintf(buffer, "%s/taskTracker", *tt_root);
sprintf(buffer, "%s/usercache", *tt_root);
if (mkdir(buffer, 0755) != 0) {
printf("FAIL: Can't create directory %s - %s\n", buffer,
strerror(errno));
@ -122,19 +128,20 @@ void create_tt_roots() {
void test_get_user_directory() {
char *user_dir = get_user_directory("/tmp", "user");
char *expected = "/tmp/taskTracker/user";
char *expected = "/tmp/usercache/user";
if (strcmp(user_dir, expected) != 0) {
printf("test_get_user_directory expected %s got %s\n", user_dir, expected);
printf("test_get_user_directory expected %s got %s\n", expected, user_dir);
exit(1);
}
free(user_dir);
}
void test_get_job_directory() {
char *expected = "/tmp/taskTracker/user/appcache/job_200906101234_0001";
char *expected = "/tmp/usercache/user/appcache/job_200906101234_0001";
char *job_dir = (char *) get_job_directory("/tmp", "user",
"job_200906101234_0001");
if (strcmp(job_dir, expected) != 0) {
printf("test_get_job_directory expected %s got %s\n", expected, job_dir);
exit(1);
}
free(job_dir);
@ -143,17 +150,18 @@ void test_get_job_directory() {
void test_get_attempt_directory() {
char *attempt_dir = get_attempt_work_directory("/tmp", "owen", "job_1",
"attempt_1");
char *expected = "/tmp/taskTracker/owen/appcache/job_1/attempt_1/work";
char *expected = "/tmp/usercache/owen/appcache/job_1/attempt_1";
if (strcmp(attempt_dir, expected) != 0) {
printf("Fail get_attempt_work_directory got %s expected %s\n",
attempt_dir, expected);
exit(1);
}
free(attempt_dir);
}
void test_get_task_launcher_file() {
char *expected_file = ("/tmp/taskTracker/user/appcache/job_200906101234_0001"
"/taskjvm.sh");
char *expected_file = ("/tmp/usercache/user/appcache/job_200906101234_0001"
"/task.sh");
char *job_dir = get_job_directory("/tmp", "user",
"job_200906101234_0001");
char *task_file = get_task_launcher_file(job_dir);
@ -168,7 +176,7 @@ void test_get_task_launcher_file() {
void test_get_job_log_dir() {
char *expected = TEST_ROOT "/logs/userlogs/job_200906101234_0001";
char *logdir = get_job_log_directory("job_200906101234_0001");
char *logdir = get_job_log_directory(TEST_ROOT "/logs/userlogs","job_200906101234_0001");
if (strcmp(logdir, expected) != 0) {
printf("Fail get_job_log_dir got %s expected %s\n", logdir, expected);
exit(1);
@ -176,15 +184,6 @@ void test_get_job_log_dir() {
free(logdir);
}
void test_get_task_log_dir() {
char *logdir = get_job_log_directory("job_5/task_4");
char *expected = TEST_ROOT "/logs/userlogs/job_5/task_4";
if (strcmp(logdir, expected) != 0) {
printf("FAIL: get_task_log_dir expected %s got %s\n", logdir, expected);
}
free(logdir);
}
void test_check_user() {
printf("\nTesting test_check_user\n");
struct passwd *user = check_user(username);
@ -221,7 +220,7 @@ void test_check_configuration_permissions() {
void test_delete_task() {
if (initialize_user(username)) {
printf("FAIL: failed to initialized user %s\n", username);
printf("FAIL: failed to initialize user %s\n", username);
exit(1);
}
char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_1");
@ -254,7 +253,8 @@ void test_delete_task() {
run(buffer);
// delete task directory
int ret = delete_as_user(username, "appcache/job_1/task_1");
char * dirs[] = {job_dir, 0};
int ret = delete_as_user(username, "task_1" , dirs);
if (ret != 0) {
printf("FAIL: return code from delete_as_user is %d\n", ret);
exit(1);
@ -315,7 +315,7 @@ void test_delete_job() {
run(buffer);
// delete task directory
int ret = delete_as_user(username, "appcache/job_2");
int ret = delete_as_user(username, job_dir, NULL);
if (ret != 0) {
printf("FAIL: return code from delete_as_user is %d\n", ret);
exit(1);
@ -349,12 +349,12 @@ void test_delete_user() {
exit(1);
}
char buffer[100000];
sprintf(buffer, "%s/local-1/taskTracker/%s", TEST_ROOT, username);
sprintf(buffer, "%s/local-1/usercache/%s", TEST_ROOT, username);
if (access(buffer, R_OK) != 0) {
printf("FAIL: directory missing before test\n");
exit(1);
}
if (delete_as_user(username, "") != 0) {
if (delete_as_user(username, buffer, NULL) != 0) {
exit(1);
}
if (access(buffer, R_OK) == 0) {
@ -368,50 +368,6 @@ void test_delete_user() {
free(job_dir);
}
void test_delete_log_directory() {
printf("\nTesting delete_log_directory\n");
char *job_log_dir = get_job_log_directory("job_1");
if (job_log_dir == NULL) {
exit(1);
}
if (create_directory_for_user(job_log_dir) != 0) {
exit(1);
}
free(job_log_dir);
char *task_log_dir = get_job_log_directory("job_1/task_2");
if (task_log_dir == NULL) {
exit(1);
}
if (mkdirs(task_log_dir, 0700) != 0) {
exit(1);
}
if (access(TEST_ROOT "/logs/userlogs/job_1/task_2", R_OK) != 0) {
printf("FAIL: can't access task directory - %s\n", strerror(errno));
exit(1);
}
if (delete_log_directory("job_1/task_2") != 0) {
printf("FAIL: can't delete task directory\n");
exit(1);
}
if (access(TEST_ROOT "/logs/userlogs/job_1/task_2", R_OK) == 0) {
printf("FAIL: task directory not deleted\n");
exit(1);
}
if (access(TEST_ROOT "/logs/userlogs/job_1", R_OK) != 0) {
printf("FAIL: job directory not deleted - %s\n", strerror(errno));
exit(1);
}
if (delete_log_directory("job_1") != 0) {
printf("FAIL: can't delete task directory\n");
exit(1);
}
if (access(TEST_ROOT "/logs/userlogs/job_1", R_OK) == 0) {
printf("FAIL: job directory not deleted\n");
exit(1);
}
free(task_log_dir);
}
void run_test_in_child(const char* test_name, void (*func)()) {
printf("\nRunning test %s in child process\n", test_name);
fflush(stdout);
@ -558,8 +514,7 @@ void test_init_job() {
exit(1);
} else if (child == 0) {
char *final_pgm[] = {"touch", "my-touch-file", 0};
if (initialize_job(username, "job_4", TEST_ROOT "/creds.txt",
TEST_ROOT "/job.xml", final_pgm) != 0) {
if (initialize_job(username, "job_4", TEST_ROOT "/creds.txt", final_pgm) != 0) {
printf("FAIL: failed in child\n");
exit(42);
}
@ -593,7 +548,7 @@ void test_init_job() {
exit(1);
}
free(job_dir);
job_dir = get_job_log_directory("job_4");
job_dir = get_job_log_directory("logs","job_4");
if (access(job_dir, R_OK) != 0) {
printf("FAIL: failed to create job log directory %s\n", job_dir);
exit(1);
@ -607,6 +562,20 @@ void test_run_task() {
printf("FAIL: seteuid to root failed - %s\n", strerror(errno));
exit(1);
}
FILE* creds = fopen(TEST_ROOT "/creds.txt", "w");
if (creds == NULL) {
printf("FAIL: failed to create credentials file - %s\n", strerror(errno));
exit(1);
}
if (fprintf(creds, "secret key\n") < 0) {
printf("FAIL: fprintf failed - %s\n", strerror(errno));
exit(1);
}
if (fclose(creds) != 0) {
printf("FAIL: fclose failed - %s\n", strerror(errno));
exit(1);
}
const char* script_name = TEST_ROOT "/task-script";
FILE* script = fopen(script_name, "w");
if (script == NULL) {
@ -638,7 +607,7 @@ void test_run_task() {
exit(1);
} else if (child == 0) {
if (run_task_as_user(username, "job_4", "task_1",
task_dir, script_name) != 0) {
task_dir, script_name, TEST_ROOT "creds.txt") != 0) {
printf("FAIL: failed in child\n");
exit(42);
}
@ -666,7 +635,7 @@ void test_run_task() {
exit(1);
}
free(task_dir);
task_dir = get_job_log_directory("job_4/task_1");
task_dir = get_job_log_directory("logs", "job_4/task_1");
if (access(task_dir, R_OK) != 0) {
printf("FAIL: failed to create job log directory %s\n", task_dir);
exit(1);
@ -723,9 +692,6 @@ int main(int argc, char **argv) {
test_check_configuration_permissions();
printf("\nTesting get_task_log_dir()\n");
test_get_task_log_dir();
printf("\nTesting delete_task()\n");
test_delete_task();
@ -736,8 +702,6 @@ int main(int argc, char **argv) {
test_check_user();
test_delete_log_directory();
// the tests that change user need to be run in a subshell, so that
// when they change user they don't give up our privs
run_test_in_child("test_signal_task", test_signal_task);

View File

@ -169,11 +169,10 @@ public int launchContainer(Container container,
launchCommandObjs.put(containerId, shExec);
// DEBUG
LOG.info("launchContainer: " + Arrays.toString(commandArray));
String output = shExec.getOutput();
try {
shExec.execute();
if (LOG.isDebugEnabled()) {
logOutput(output);
logOutput(shExec.getOutput());
}
} catch (ExitCodeException e) {
int exitCode = shExec.getExitCode();
@ -183,9 +182,9 @@ public int launchContainer(Container container,
// container-executor's output
if (exitCode != 143 && exitCode != 137) {
LOG.warn("Exception from container-launch : ", e);
logOutput(output);
logOutput(shExec.getOutput());
String diagnostics = "Exception from container-launch: \n"
+ StringUtils.stringifyException(e) + "\n" + output;
+ StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
diagnostics));
} else {
@ -198,7 +197,7 @@ public int launchContainer(Container container,
}
if (LOG.isDebugEnabled()) {
LOG.debug("Output from LinuxContainerExecutor's launchContainer follows:");
logOutput(output);
logOutput(shExec.getOutput());
}
return 0;
}

View File

@ -18,151 +18,228 @@
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import static junit.framework.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import junit.framework.Assert;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* This is intended to test the LinuxContainerExecutor code, but because of
* some security restrictions this can only be done with some special setup
* first.
* <br><ol>
* <li>Compile the code with container-executor.conf.dir set to the location you
* want for testing.
* <br><pre><code>
* > mvn clean install -Dcontainer-executor.conf.dir=/etc/hadoop -DskipTests
* </code></pre>
*
* <li>Set up <code>${container-executor.conf.dir}/taskcontroller.cfg</code>
* taskcontroller.cfg needs to be owned by root and have in it the proper
* config values.
* <br><pre><code>
* > cat /etc/hadoop/taskcontroller.cfg
* mapreduce.cluster.local.dir=/tmp/hadoop/nm-local/
* hadoop.log.dir=/tmp/hadoop/nm-log
* mapreduce.tasktracker.group=mapred
* #depending on the user id of the application.submitter option
* min.user.id=1
* > sudo chown root:root /etc/hadoop/taskcontroller.cfg
* > sudo chmod 444 /etc/hadoop/taskcontroller.cfg
* </code></pre>
*
* <li>iMove the binary and set proper permissions on it. It needs to be owned
* by root, the group needs to be the group configured in taskcontroller.cfg,
* and it needs the setuid bit set. (The build will also overwrite it so you
* need to move it to a place that you can support it.
* <br><pre><code>
* > cp ./hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/container-executor /tmp/
* > sudo chown root:mapred /tmp/container-executor
* > sudo chmod 4550 /tmp/container-executor
* </code></pre>
*
* <li>Run the tests with the execution enabled (The user you run the tests as
* needs to be part of the group from the config.
* <br><pre><code>
* mvn test -Dtest=TestLinuxContainerExecutor -Dapplication.submitter=nobody -Dcontainer-executor.path=/tmp/container-executor
* </code></pre>
* </ol>
*/
public class TestLinuxContainerExecutor {
//
// private static final Log LOG = LogFactory
// .getLog(TestLinuxContainerExecutor.class);
//
// // TODO: FIXME
// private static File workSpace = new File("target",
// TestLinuxContainerExecutor.class.getName() + "-workSpace");
//
// @Before
// public void setup() throws IOException {
// FileContext.getLocalFSFileContext().mkdir(
// new Path(workSpace.getAbsolutePath()), null, true);
// workSpace.setReadable(true, false);
// workSpace.setExecutable(true, false);
// workSpace.setWritable(true, false);
// }
//
// @After
// public void tearDown() throws AccessControlException, FileNotFoundException,
// UnsupportedFileSystemException, IOException {
// FileContext.getLocalFSFileContext().delete(
// new Path(workSpace.getAbsolutePath()), true);
// }
//
@Test
public void testCommandFilePreparation() throws IOException {
// LinuxContainerExecutor executor = new LinuxContainerExecutor(new String[] {
// "/bin/echo", "hello" }, null, null, "nobody"); // TODO: fix user name
// executor.prepareCommandFile(workSpace.getAbsolutePath());
//
// // Now verify the contents of the commandFile
// File commandFile = new File(workSpace, LinuxContainerExecutor.COMMAND_FILE);
// BufferedReader reader = new BufferedReader(new FileReader(commandFile));
// Assert.assertEquals("/bin/echo hello", reader.readLine());
// Assert.assertEquals(null, reader.readLine());
// Assert.assertTrue(commandFile.canExecute());
private static final Log LOG = LogFactory
.getLog(TestLinuxContainerExecutor.class);
private static File workSpace = new File("target",
TestLinuxContainerExecutor.class.getName() + "-workSpace");
private LinuxContainerExecutor exec = null;
private String appSubmitter = null;
@Before
public void setup() throws Exception {
FileContext.getLocalFSFileContext().mkdir(
new Path(workSpace.getAbsolutePath()), null, true);
workSpace.setReadable(true, false);
workSpace.setExecutable(true, false);
workSpace.setWritable(true, false);
String exec_path = System.getProperty("container-executor.path");
if(exec_path != null && !exec_path.isEmpty()) {
Configuration conf = new Configuration(false);
LOG.info("Setting "+YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH
+"="+exec_path);
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, exec_path);
exec = new LinuxContainerExecutor();
exec.setConf(conf);
}
appSubmitter = System.getProperty("application.submitter");
if(appSubmitter == null || appSubmitter.isEmpty()) {
appSubmitter = "nobody";
}
}
@After
public void tearDown() throws Exception {
FileContext.getLocalFSFileContext().delete(
new Path(workSpace.getAbsolutePath()), true);
}
private boolean shouldRun() {
if(exec == null) {
LOG.warn("Not running test because container-executor.path is not set");
return false;
}
return true;
}
private String writeScriptFile(String ... cmd) throws IOException {
File f = File.createTempFile("TestLinuxContainerExecutor", ".sh");
f.deleteOnExit();
PrintWriter p = new PrintWriter(new FileOutputStream(f));
p.println("#!/bin/sh");
p.print("exec");
for(String part: cmd) {
p.print(" '");
p.print(part.replace("\\", "\\\\").replace("'", "\\'"));
p.print("'");
}
p.println();
p.close();
return f.getAbsolutePath();
}
private int id = 0;
private synchronized int getNextId() {
id += 1;
return id;
}
private ContainerId getNextContainerId() {
ContainerId cId = mock(ContainerId.class);
String id = "CONTAINER_"+getNextId();
when(cId.toString()).thenReturn(id);
return cId;
}
private int runAndBlock(String ... cmd) throws IOException {
return runAndBlock(getNextContainerId(), cmd);
}
private int runAndBlock(ContainerId cId, String ... cmd) throws IOException {
String appId = "APP_"+getNextId();
Container container = mock(Container.class);
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
HashMap<String, String> env = new HashMap<String,String>();
when(container.getContainerID()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context);
when(context.getEnvironment()).thenReturn(env);
String script = writeScriptFile(cmd);
Path scriptPath = new Path(script);
Path tokensPath = new Path("/dev/null");
Path workDir = new Path(workSpace.getAbsolutePath());
return exec.launchContainer(container, scriptPath, tokensPath,
appSubmitter, appId, workDir);
}
@Test
public void testContainerLaunch() throws IOException {
if (!shouldRun()) {
return;
}
File touchFile = new File(workSpace, "touch-file");
int ret = runAndBlock("touch", touchFile.getAbsolutePath());
assertEquals(0, ret);
FileStatus fileStatus = FileContext.getLocalFSFileContext().getFileStatus(
new Path(touchFile.getAbsolutePath()));
assertEquals(appSubmitter, fileStatus.getOwner());
}
@Test
public void testContainerKill() throws Exception {
if (!shouldRun()) {
return;
}
final ContainerId sleepId = getNextContainerId();
Thread t = new Thread() {
public void run() {
try {
runAndBlock(sleepId, "sleep", "100");
} catch (IOException e) {
LOG.warn("Caught exception while running sleep",e);
}
};
};
t.setDaemon(true); //If it does not exit we shouldn't block the test.
t.start();
assertTrue(t.isAlive());
String pid = null;
int count = 10;
while ((pid = exec.getProcessId(sleepId)) == null && count > 0) {
LOG.info("Sleeping for 200 ms before checking for pid ");
Thread.sleep(200);
count--;
}
assertNotNull(pid);
LOG.info("Going to killing the process.");
exec.signalContainer(appSubmitter, pid, Signal.TERM);
LOG.info("sleeping for 100ms to let the sleep be killed");
Thread.sleep(100);
assertFalse(t.isAlive());
}
//
// @Test
// public void testContainerLaunch() throws IOException {
// String containerExecutorPath = System
// .getProperty("container-executor-path");
// if (containerExecutorPath == null || containerExecutorPath.equals("")) {
// LOG.info("Not Running test for lack of container-executor-path");
// return;
// }
//
// String applicationSubmitter = "nobody";
//
// File touchFile = new File(workSpace, "touch-file");
// LinuxContainerExecutor executor = new LinuxContainerExecutor(new String[] {
// "touch", touchFile.getAbsolutePath() }, workSpace, null,
// applicationSubmitter);
// executor.setCommandExecutorPath(containerExecutorPath);
// executor.execute();
//
// FileStatus fileStatus = FileContext.getLocalFSFileContext().getFileStatus(
// new Path(touchFile.getAbsolutePath()));
// Assert.assertEquals(applicationSubmitter, fileStatus.getOwner());
// }
//
// @Test
// public void testContainerKill() throws IOException, InterruptedException,
// IllegalArgumentException, SecurityException, IllegalAccessException,
// NoSuchFieldException {
// String containerExecutorPath = System
// .getProperty("container-executor-path");
// if (containerExecutorPath == null || containerExecutorPath.equals("")) {
// LOG.info("Not Running test for lack of container-executor-path");
// return;
// }
//
// String applicationSubmitter = "nobody";
// final LinuxContainerExecutor executor = new LinuxContainerExecutor(
// new String[] { "sleep", "100" }, workSpace, null, applicationSubmitter);
// executor.setCommandExecutorPath(containerExecutorPath);
// new Thread() {
// public void run() {
// try {
// executor.execute();
// } catch (IOException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// };
// }.start();
//
// String pid;
// while ((pid = executor.getPid()) == null) {
// LOG.info("Sleeping for 5 seconds before checking if "
// + "the process is alive.");
// Thread.sleep(5000);
// }
// LOG.info("Going to check the liveliness of the process with pid " + pid);
//
// LinuxContainerExecutor checkLiveliness = new LinuxContainerExecutor(
// new String[] { "kill", "-0", "-" + pid }, workSpace, null,
// applicationSubmitter);
// checkLiveliness.setCommandExecutorPath(containerExecutorPath);
// checkLiveliness.execute();
//
// LOG.info("Process is alive. "
// + "Sleeping for 5 seconds before killing the process.");
// Thread.sleep(5000);
// LOG.info("Going to killing the process.");
//
// executor.kill();
//
// LOG.info("Sleeping for 5 seconds before checking if "
// + "the process is alive.");
// Thread.sleep(5000);
// LOG.info("Going to check the liveliness of the process.");
//
// // TODO: fix
// checkLiveliness = new LinuxContainerExecutor(new String[] { "kill", "-0",
// "-" + pid }, workSpace, null, applicationSubmitter);
// checkLiveliness.setCommandExecutorPath(containerExecutorPath);
// boolean success = false;
// try {
// checkLiveliness.execute();
// success = true;
// } catch (IOException e) {
// success = false;
// }
//
// Assert.assertFalse(success);
// }
}

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import static junit.framework.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.LineNumberReader;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestLinuxContainerExecutorWithMocks {
@SuppressWarnings("unused")
private static final Log LOG = LogFactory
.getLog(TestLinuxContainerExecutorWithMocks.class);
private LinuxContainerExecutor mockExec = null;
private final File mockParamFile = new File("./params.txt");
private void deleteMockParamFile() {
if(mockParamFile.exists()) {
mockParamFile.delete();
}
}
private List<String> readMockParams() throws IOException {
LinkedList<String> ret = new LinkedList<String>();
LineNumberReader reader = new LineNumberReader(new FileReader(mockParamFile));
String line;
while((line = reader.readLine()) != null) {
ret.add(line);
}
reader.close();
return ret;
}
@Before
public void setup() throws IOException {
File f = new File("./src/test/resources/mock-container-executor");
if(!f.canExecute()) {
f.setExecutable(true);
}
String executorPath = f.getAbsolutePath();
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, executorPath);
mockExec = new LinuxContainerExecutor();
mockExec.setConf(conf);
}
@After
public void tearDown() throws IOException {
deleteMockParamFile();
}
@Test
public void testContainerLaunch() throws IOException {
String appSubmitter = "nobody";
String cmd = String.valueOf(
LinuxContainerExecutor.Commands.LAUNCH_CONTAINER.getValue());
String appId = "APP_ID";
String containerId = "CONTAINER_ID";
Container container = mock(Container.class);
ContainerId cId = mock(ContainerId.class);
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
HashMap<String, String> env = new HashMap<String,String>();
when(container.getContainerID()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context);
when(cId.toString()).thenReturn(containerId);
when(context.getEnvironment()).thenReturn(env);
Path scriptPath = new Path("file:///bin/true");
Path tokensPath = new Path("file:///dev/null");
Path workDir = new Path("/tmp");
int ret = mockExec.launchContainer(container, scriptPath, tokensPath,
appSubmitter, appId, workDir);
assertEquals(0, ret);
assertEquals(Arrays.asList(appSubmitter, cmd, appId, containerId,
workDir.toString(), "/bin/true", "/dev/null"),
readMockParams());
}
@Test
public void testContainerKill() throws IOException {
String appSubmitter = "nobody";
String cmd = String.valueOf(
LinuxContainerExecutor.Commands.SIGNAL_CONTAINER.getValue());
ContainerExecutor.Signal signal = ContainerExecutor.Signal.QUIT;
String sigVal = String.valueOf(signal.getValue());
mockExec.signalContainer(appSubmitter, "1000", signal);
assertEquals(Arrays.asList(appSubmitter, cmd, "1000", sigVal),
readMockParams());
}
@Test
public void testDeleteAsUser() throws IOException {
String appSubmitter = "nobody";
String cmd = String.valueOf(
LinuxContainerExecutor.Commands.DELETE_AS_USER.getValue());
Path dir = new Path("/tmp/testdir");
mockExec.deleteAsUser(appSubmitter, dir);
assertEquals(Arrays.asList(appSubmitter, cmd, "/tmp/testdir"),
readMockParams());
}
}

View File

@ -0,0 +1,10 @@
#!/bin/sh
for PARAM in "$@"
do
echo $PARAM;
done > params.txt
if [[ "$2" == "1" ]];
then
cd $5;
exec $6;
fi;