YARN-7443. Add native FPGA module support to do isolation with cgroups. (Zhankun Tang via wangda)

Change-Id: Ic4b7f9f3e032986b8f955139c9fe4d3a6c818a53
This commit is contained in:
Wangda Tan 2017-12-08 15:18:22 -08:00
parent adca1a72e4
commit 04b84da245
6 changed files with 504 additions and 0 deletions

View File

@ -15,3 +15,9 @@ feature.tc.enabled=0
# docker.allowed.rw-mounts=## comma seperate volumes that can be mounted as read-write, add the yarn local and log dirs to this list to run Hadoop jobs
# docker.privileged-containers.enabled=0
# docker.allowed.volume-drivers=## comma seperated list of allowed volume-drivers
# The configs below deal with settings for FPGA resource
#[fpga]
# module.enabled=## Enable/Disable the FPGA resource handler module. set to "true" to enable, disabled by default
# fpga.major-device-number=## Major device number of FPGA, by default is 246. Strongly recommend setting this
# fpga.allowed-device-minor-numbers=## Comma separated allowed minor device numbers, empty means all FPGA devices managed by YARN.

View File

@ -132,6 +132,7 @@ add_library(container
main/native/container-executor/impl/modules/cgroups/cgroups-operations.c
main/native/container-executor/impl/modules/common/module-configs.c
main/native/container-executor/impl/modules/gpu/gpu-module.c
main/native/container-executor/impl/modules/fpga/fpga-module.c
main/native/container-executor/impl/utils/docker-util.c
)
@ -165,6 +166,7 @@ add_executable(cetest
main/native/container-executor/test/utils/test-path-utils.cc
main/native/container-executor/test/modules/cgroups/test-cgroups-module.cc
main/native/container-executor/test/modules/gpu/test-gpu-module.cc
main/native/container-executor/test/modules/fpga/test-fpga-module.cc
main/native/container-executor/test/test_util.cc
main/native/container-executor/test/utils/test_docker_util.cc)
target_link_libraries(cetest gtest container)

View File

@ -22,6 +22,7 @@
#include "util.h"
#include "get_executable.h"
#include "modules/gpu/gpu-module.h"
#include "modules/fpga/fpga-module.h"
#include "modules/cgroups/cgroups-operations.h"
#include <errno.h>
@ -244,6 +245,11 @@ static int validate_arguments(int argc, char **argv , int *operation) {
&argv[1]);
}
if (strcmp("--module-fpga", argv[1]) == 0) {
return handle_fpga_request(&update_cgroups_parameters, "fpga", argc - 1,
&argv[1]);
}
if (strcmp("--checksetup", argv[1]) == 0) {
*operation = CHECK_SETUP;
return 0;

View File

@ -0,0 +1,229 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "configuration.h"
#include "container-executor.h"
#include "utils/string-utils.h"
#include "modules/fpga/fpga-module.h"
#include "modules/cgroups/cgroups-operations.h"
#include "modules/common/module-configs.h"
#include "modules/common/constants.h"
#include "util.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <getopt.h>
#include <unistd.h>
#define EXCLUDED_FPGAS_OPTION "excluded_fpgas"
#define CONTAINER_ID_OPTION "container_id"
#define DEFAULT_INTEL_MAJOR_NUMBER 246
#define MAX_CONTAINER_ID_LEN 128
static const struct section* cfg_section;
static int internal_handle_fpga_request(
update_cgroups_parameters_function update_cgroups_parameters_func_p,
size_t n_minor_devices_to_block, int minor_devices[],
const char* container_id) {
char* allowed_minor_numbers_str = NULL;
int* allowed_minor_numbers = NULL;
size_t n_allowed_minor_numbers = 0;
int return_code = 0;
if (n_minor_devices_to_block == 0) {
// no device to block, just return;
return 0;
}
// Get major device number from cfg, if not set, major number of (Intel)
// will be the default value.
int major_device_number;
char* major_number_str = get_section_value(FPGA_MAJOR_NUMBER_CONFIG_KEY,
cfg_section);
if (!major_number_str || 0 == major_number_str[0]) {
// Default major number of Intel devices
major_device_number = DEFAULT_INTEL_MAJOR_NUMBER;
} else {
major_device_number = strtol(major_number_str, NULL, 0);
}
// Get allowed minor device numbers from cfg, if not set, means all minor
// devices can be used by YARN
allowed_minor_numbers_str = get_section_value(
FPGA_ALLOWED_DEVICES_MINOR_NUMBERS,
cfg_section);
if (!allowed_minor_numbers_str || 0 == allowed_minor_numbers_str[0]) {
allowed_minor_numbers = NULL;
} else {
int rc = get_numbers_split_by_comma(allowed_minor_numbers_str,
&allowed_minor_numbers,
&n_allowed_minor_numbers);
if (0 != rc) {
fprintf(ERRORFILE,
"Failed to get allowed minor device numbers from cfg, value=%s\n",
allowed_minor_numbers_str);
return_code = -1;
goto cleanup;
}
// Make sure we're trying to black devices allowed in config
for (int i = 0; i < n_minor_devices_to_block; i++) {
int found = 0;
for (int j = 0; j < n_allowed_minor_numbers; j++) {
if (minor_devices[i] == allowed_minor_numbers[j]) {
found = 1;
break;
}
}
if (!found) {
fprintf(ERRORFILE,
"Trying to blacklist device with minor-number=%d which is not on allowed list\n",
minor_devices[i]);
return_code = -1;
goto cleanup;
}
}
}
// Use cgroup helpers to blacklist devices
for (int i = 0; i < n_minor_devices_to_block; i++) {
char param_value[128];
memset(param_value, 0, sizeof(param_value));
snprintf(param_value, sizeof(param_value), "c %d:%d rwm",
major_device_number, minor_devices[i]);
int rc = update_cgroups_parameters_func_p("devices", "deny",
container_id, param_value);
if (0 != rc) {
fprintf(ERRORFILE, "CGroups: Failed to update cgroups\n");
return_code = -1;
goto cleanup;
}
}
cleanup:
if (major_number_str) {
free(major_number_str);
}
if (allowed_minor_numbers) {
free(allowed_minor_numbers);
}
if (allowed_minor_numbers_str) {
free(allowed_minor_numbers_str);
}
return return_code;
}
void reload_fpga_configuration() {
cfg_section = get_configuration_section(FPGA_MODULE_SECTION_NAME, get_cfg());
}
/*
* Format of FPGA request commandline:
*
* c-e fpga --excluded_fpgas 0,1,3 --container_id container_x_y
*/
int handle_fpga_request(update_cgroups_parameters_function func,
const char* module_name, int module_argc, char** module_argv) {
if (!cfg_section) {
reload_fpga_configuration();
}
if (!module_enabled(cfg_section, FPGA_MODULE_SECTION_NAME)) {
fprintf(ERRORFILE,
"Please make sure fpga module is enabled before using it.\n");
return -1;
}
static struct option long_options[] = {
{EXCLUDED_FPGAS_OPTION, required_argument, 0, 'e' },
{CONTAINER_ID_OPTION, required_argument, 0, 'c' },
{0, 0, 0, 0}
};
int rc = 0;
int c = 0;
int option_index = 0;
int* minor_devices = NULL;
char container_id[MAX_CONTAINER_ID_LEN];
memset(container_id, 0, sizeof(container_id));
size_t n_minor_devices_to_block = 0;
int failed = 0;
optind = 1;
while((c = getopt_long(module_argc, module_argv, "e:c:",
long_options, &option_index)) != -1) {
switch(c) {
case 'e':
rc = get_numbers_split_by_comma(optarg, &minor_devices,
&n_minor_devices_to_block);
if (0 != rc) {
fprintf(ERRORFILE,
"Failed to get minor devices number from command line, value=%s\n",
optarg);
failed = 1;
goto cleanup;
}
break;
case 'c':
if (!validate_container_id(optarg)) {
fprintf(ERRORFILE,
"Specified container_id=%s is invalid\n", optarg);
failed = 1;
goto cleanup;
}
strncpy(container_id, optarg, MAX_CONTAINER_ID_LEN);
break;
default:
fprintf(ERRORFILE,
"Unknown option in fpga command character %d %c, optionindex = %d\n",
c, c, optind);
failed = 1;
goto cleanup;
}
}
if (0 == container_id[0]) {
fprintf(ERRORFILE,
"[%s] --container_id must be specified.\n", __func__);
failed = 1;
goto cleanup;
}
if (!minor_devices) {
// Minor devices is null, skip following call.
fprintf(ERRORFILE, "is not specified, skip cgroups call.\n");
goto cleanup;
}
failed = internal_handle_fpga_request(func, n_minor_devices_to_block,
minor_devices,
container_id);
cleanup:
if (minor_devices) {
free(minor_devices);
}
return failed;
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef __FreeBSD__
#define _WITH_GETLINE
#endif
#ifndef _MODULES_FPGA_FPGA_MUDULE_H_
#define _MODULES_FPGA_FPGA_MUDULE_H_
#define FPGA_MAJOR_NUMBER_CONFIG_KEY "fpga.major-device-number"
#define FPGA_ALLOWED_DEVICES_MINOR_NUMBERS "fpga.allowed-device-minor-numbers"
#define FPGA_MODULE_SECTION_NAME "fpga"
// For unit test stubbing
typedef int (*update_cgroups_parameters_function)(const char*, const char*,
const char*, const char*);
/**
* Handle fpga requests
*/
int handle_fpga_request(update_cgroups_parameters_function func,
const char* module_name, int module_argc, char** module_argv);
/**
* Reload config from filesystem, visible for testing.
*/
void reload_fpga_configuration();
#endif

View File

@ -0,0 +1,216 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vector>
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <unistd.h>
#include <gtest/gtest.h>
#include <sstream>
extern "C" {
#include "configuration.h"
#include "container-executor.h"
#include "modules/cgroups/cgroups-operations.h"
#include "modules/fpga/fpga-module.h"
#include "test/test-container-executor-common.h"
#include "util.h"
}
namespace ContainerExecutor {
class TestFpgaModule : public ::testing::Test {
protected:
virtual void SetUp() {
if (mkdirs(TEST_ROOT, 0755) != 0) {
fprintf(ERRORFILE, "Failed to mkdir TEST_ROOT: %s\n", TEST_ROOT);
exit(1);
}
LOGFILE = stdout;
ERRORFILE = stderr;
}
virtual void TearDown() {
}
};
static std::vector<const char*> cgroups_parameters_invoked;
static int mock_update_cgroups_parameters(
const char* controller_name,
const char* param_name,
const char* group_id,
const char* value) {
char* buf = (char*) malloc(128);
strcpy(buf, controller_name);
cgroups_parameters_invoked.push_back(buf);
buf = (char*) malloc(128);
strcpy(buf, param_name);
cgroups_parameters_invoked.push_back(buf);
buf = (char*) malloc(128);
strcpy(buf, group_id);
cgroups_parameters_invoked.push_back(buf);
buf = (char*) malloc(128);
strcpy(buf, value);
cgroups_parameters_invoked.push_back(buf);
return 0;
}
static void verify_param_updated_to_cgroups(
int argc, const char** argv) {
ASSERT_EQ(argc, cgroups_parameters_invoked.size());
int offset = 0;
while (offset < argc) {
ASSERT_STREQ(argv[offset], cgroups_parameters_invoked[offset]);
offset++;
}
}
static void write_and_load_fpga_module_to_cfg(const char* cfg_filepath, int enabled) {
FILE *file = fopen(cfg_filepath, "w");
if (file == NULL) {
printf("FAIL: Could not open configuration file: %s\n", cfg_filepath);
exit(1);
}
fprintf(file, "[fpga]\n");
if (enabled) {
fprintf(file, "module.enabled=true\n");
} else {
fprintf(file, "module.enabled=false\n");
}
fclose(file);
// Read config file
read_executor_config(cfg_filepath);
reload_fpga_configuration();
}
static void test_fpga_module_enabled_disabled(int enabled) {
// Write config file.
const char *filename = TEST_ROOT "/test_cgroups_module_enabled_disabled.cfg";
write_and_load_fpga_module_to_cfg(filename, enabled);
char* argv[] = { (char*) "--module-fpga", (char*) "--excluded_fpgas", (char*) "0,1",
(char*) "--container_id",
(char*) "container_1498064906505_0001_01_000001" };
int rc = handle_fpga_request(&mock_update_cgroups_parameters,
"fpga", 5, argv);
int EXPECTED_RC;
if (enabled) {
EXPECTED_RC = 0;
} else {
EXPECTED_RC = -1;
}
ASSERT_EQ(EXPECTED_RC, rc);
}
TEST_F(TestFpgaModule, test_verify_fpga_module_calls_cgroup_parameter) {
// Write config file.
const char *filename = TEST_ROOT "/test_verify_fpga_module_calls_cgroup_parameter.cfg";
write_and_load_fpga_module_to_cfg(filename, 1);
char* container_id = (char*) "container_1498064906505_0001_01_000001";
char* argv[] = { (char*) "--module-fpga", (char*) "--excluded_fpgas", (char*) "0,1",
(char*) "--container_id",
container_id };
/* Test case 1: block 2 devices */
cgroups_parameters_invoked.clear();
int rc = handle_fpga_request(&mock_update_cgroups_parameters,
"fpga", 5, argv);
ASSERT_EQ(0, rc) << "Should success.\n";
// Verify cgroups parameters
const char* expected_cgroups_argv[] = { "devices", "deny", container_id, "c 246:0 rwm",
"devices", "deny", container_id, "c 246:1 rwm"};
verify_param_updated_to_cgroups(8, expected_cgroups_argv);
/* Test case 2: block 0 devices */
cgroups_parameters_invoked.clear();
char* argv_1[] = { (char*) "--module-fpga", (char*) "--container_id", container_id };
rc = handle_fpga_request(&mock_update_cgroups_parameters,
"fpga", 3, argv_1);
ASSERT_EQ(0, rc) << "Should success.\n";
// Verify cgroups parameters
verify_param_updated_to_cgroups(0, NULL);
/* Test case 3: block 2 non-sequential devices */
cgroups_parameters_invoked.clear();
char* argv_2[] = { (char*) "--module-fpga", (char*) "--excluded_fpgas", (char*) "1,3",
(char*) "--container_id", container_id };
rc = handle_fpga_request(&mock_update_cgroups_parameters,
"fpga", 5, argv_2);
ASSERT_EQ(0, rc) << "Should success.\n";
// Verify cgroups parameters
const char* expected_cgroups_argv_2[] = { "devices", "deny", container_id, "c 246:1 rwm",
"devices", "deny", container_id, "c 246:3 rwm"};
verify_param_updated_to_cgroups(8, expected_cgroups_argv_2);
}
TEST_F(TestFpgaModule, test_illegal_cli_parameters) {
// Write config file.
const char *filename = TEST_ROOT "/test_illegal_cli_parameters.cfg";
write_and_load_fpga_module_to_cfg(filename, 1);
// Illegal container id - 1
char* argv[] = { (char*) "--module-fpga", (char*) "--excluded_fpgas", (char*) "0,1",
(char*) "--container_id", (char*) "xxxx" };
int rc = handle_fpga_request(&mock_update_cgroups_parameters,
"fpga", 5, argv);
ASSERT_NE(0, rc) << "Should fail.\n";
// Illegal container id - 2
char* argv_1[] = { (char*) "--module-fpga", (char*) "--excluded_fpgas", (char*) "0,1",
(char*) "--container_id", (char*) "container_1" };
rc = handle_fpga_request(&mock_update_cgroups_parameters,
"fpga", 5, argv_1);
ASSERT_NE(0, rc) << "Should fail.\n";
// Illegal container id - 3
char* argv_2[] = { (char*) "--module-fpga", (char*) "--excluded_fpgas", (char*) "0,1" };
rc = handle_fpga_request(&mock_update_cgroups_parameters,
"fpga", 3, argv_2);
ASSERT_NE(0, rc) << "Should fail.\n";
}
TEST_F(TestFpgaModule, test_fpga_module_disabled) {
test_fpga_module_enabled_disabled(0);
}
TEST_F(TestFpgaModule, test_fpga_module_enabled) {
test_fpga_module_enabled_disabled(1);
}
} // namespace ContainerExecutor