YARN-9667. Use setbuf with line buffer to reduce fflush complexity in container-executor.

Contributed by Peter Bacsko
This commit is contained in:
Eric Yang 2019-08-05 13:59:12 -04:00
parent c589983e9c
commit d6697da5e8
7 changed files with 57 additions and 213 deletions

View File

@ -130,10 +130,20 @@ static void display_usage(FILE *stream) {
static void open_log_files() { static void open_log_files() {
if (LOGFILE == NULL) { if (LOGFILE == NULL) {
LOGFILE = stdout; LOGFILE = stdout;
if (setvbuf(LOGFILE, NULL, _IOLBF, BUFSIZ)) {
fprintf(LOGFILE, "Failed to invoke setvbuf() for LOGFILE: %s\n", strerror(errno));
fflush(LOGFILE);
exit(ERROR_CALLING_SETVBUF);
}
} }
if (ERRORFILE == NULL) { if (ERRORFILE == NULL) {
ERRORFILE = stderr; ERRORFILE = stderr;
if (setvbuf(ERRORFILE, NULL, _IOLBF, BUFSIZ)) {
fprintf(ERRORFILE, "Failed to invoke setvbuf() for ERRORFILE: %s\n", strerror(errno));
fflush(ERRORFILE);
exit(ERROR_CALLING_SETVBUF);
}
} }
// There may be a process reading from stdout/stderr, and if it // There may be a process reading from stdout/stderr, and if it
@ -232,7 +242,6 @@ static void assert_valid_setup(char *argv0) {
static void display_feature_disabled_message(const char* name) { static void display_feature_disabled_message(const char* name) {
fprintf(ERRORFILE, "Feature disabled: %s\n", name); fprintf(ERRORFILE, "Feature disabled: %s\n", name);
fflush(ERRORFILE);
} }
/* Use to store parsed input parmeters for various operations */ /* Use to store parsed input parmeters for various operations */
@ -450,7 +459,6 @@ static int validate_run_as_user_commands(int argc, char **argv, int *operation)
fprintf(LOGFILE, "main : command provided %d\n", command); fprintf(LOGFILE, "main : command provided %d\n", command);
fprintf(LOGFILE, "main : run as user is %s\n", cmd_input.run_as_user_name); fprintf(LOGFILE, "main : run as user is %s\n", cmd_input.run_as_user_name);
fprintf(LOGFILE, "main : requested yarn user is %s\n", cmd_input.yarn_user_name); fprintf(LOGFILE, "main : requested yarn user is %s\n", cmd_input.yarn_user_name);
fflush(LOGFILE);
char * resources = NULL;// key,value pair describing resources char * resources = NULL;// key,value pair describing resources
char * resources_key = NULL; char * resources_key = NULL;
char * resources_value = NULL; char * resources_value = NULL;
@ -459,14 +467,12 @@ static int validate_run_as_user_commands(int argc, char **argv, int *operation)
if (argc < 10) { if (argc < 10) {
fprintf(ERRORFILE, "Too few arguments (%d vs 10) for initialize container\n", fprintf(ERRORFILE, "Too few arguments (%d vs 10) for initialize container\n",
argc); argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER; return INVALID_ARGUMENT_NUMBER;
} }
cmd_input.app_id = argv[optind++]; cmd_input.app_id = argv[optind++];
cmd_input.container_id = argv[optind++]; cmd_input.container_id = argv[optind++];
if (!validate_container_id(cmd_input.container_id)) { if (!validate_container_id(cmd_input.container_id)) {
fprintf(ERRORFILE, "Invalid container id %s\n", cmd_input.container_id); fprintf(ERRORFILE, "Invalid container id %s\n", cmd_input.container_id);
fflush(ERRORFILE);
return INVALID_CONTAINER_ID; return INVALID_CONTAINER_ID;
} }
cmd_input.cred_file = argv[optind++]; cmd_input.cred_file = argv[optind++];
@ -481,7 +487,6 @@ static int validate_run_as_user_commands(int argc, char **argv, int *operation)
if (!(argc >= 14 && argc <= 17)) { if (!(argc >= 14 && argc <= 17)) {
fprintf(ERRORFILE, "Wrong number of arguments (%d vs 14 - 17) for" fprintf(ERRORFILE, "Wrong number of arguments (%d vs 14 - 17) for"
" launch docker container\n", argc); " launch docker container\n", argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER; return INVALID_ARGUMENT_NUMBER;
} }
@ -525,7 +530,6 @@ static int validate_run_as_user_commands(int argc, char **argv, int *operation)
if (!(argc >= 14 && argc <= 17)) { if (!(argc >= 14 && argc <= 17)) {
fprintf(ERRORFILE, "Wrong number of arguments (%d vs 14 - 17)" fprintf(ERRORFILE, "Wrong number of arguments (%d vs 14 - 17)"
" for launch container\n", argc); " for launch container\n", argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER; return INVALID_ARGUMENT_NUMBER;
} }
@ -550,9 +554,8 @@ static int validate_run_as_user_commands(int argc, char **argv, int *operation)
if (get_kv_key(resources, resources_key, strlen(resources)) < 0 || if (get_kv_key(resources, resources_key, strlen(resources)) < 0 ||
get_kv_value(resources, resources_value, strlen(resources)) < 0) { get_kv_value(resources, resources_value, strlen(resources)) < 0) {
fprintf(ERRORFILE, "Invalid arguments for cgroups resources: %s", fprintf(ERRORFILE, "Invalid arguments for cgroups resources: %s\n",
resources); resources);
fflush(ERRORFILE);
free(resources_key); free(resources_key);
free(resources_value); free(resources_value);
return INVALID_ARGUMENT_NUMBER; return INVALID_ARGUMENT_NUMBER;
@ -578,7 +581,6 @@ static int validate_run_as_user_commands(int argc, char **argv, int *operation)
if (argc != 6) { if (argc != 6) {
fprintf(ERRORFILE, "Wrong number of arguments (%d vs 6) for " \ fprintf(ERRORFILE, "Wrong number of arguments (%d vs 6) for " \
"signal container\n", argc); "signal container\n", argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER; return INVALID_ARGUMENT_NUMBER;
} }
@ -587,14 +589,12 @@ static int validate_run_as_user_commands(int argc, char **argv, int *operation)
cmd_input.container_pid = strtol(option, &end_ptr, 10); cmd_input.container_pid = strtol(option, &end_ptr, 10);
if (option == end_ptr || *end_ptr != '\0') { if (option == end_ptr || *end_ptr != '\0') {
fprintf(ERRORFILE, "Illegal argument for container pid %s\n", option); fprintf(ERRORFILE, "Illegal argument for container pid %s\n", option);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER; return INVALID_ARGUMENT_NUMBER;
} }
option = argv[optind++]; option = argv[optind++];
cmd_input.signal = strtol(option, &end_ptr, 10); cmd_input.signal = strtol(option, &end_ptr, 10);
if (option == end_ptr || *end_ptr != '\0') { if (option == end_ptr || *end_ptr != '\0') {
fprintf(ERRORFILE, "Illegal argument for signal %s\n", option); fprintf(ERRORFILE, "Illegal argument for signal %s\n", option);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER; return INVALID_ARGUMENT_NUMBER;
} }
@ -615,8 +615,7 @@ static int validate_run_as_user_commands(int argc, char **argv, int *operation)
*operation = RUN_AS_USER_SYNC_YARN_SYSFS; *operation = RUN_AS_USER_SYNC_YARN_SYSFS;
return 0; return 0;
default: default:
fprintf(ERRORFILE, "Invalid command %d not supported.",command); fprintf(ERRORFILE, "Invalid command %d not supported.\n",command);
fflush(ERRORFILE);
return INVALID_COMMAND_PROVIDED; return INVALID_COMMAND_PROVIDED;
} }
} }

View File

@ -61,7 +61,6 @@ static int is_block_device(const char* value) {
char* block_path = malloc(max_path_size); char* block_path = malloc(max_path_size);
if (block_path == NULL) { if (block_path == NULL) {
fprintf(ERRORFILE, "Failed to allocate memory for sys device path string.\n"); fprintf(ERRORFILE, "Failed to allocate memory for sys device path string.\n");
fflush(ERRORFILE);
goto cleanup; goto cleanup;
} }
if (snprintf(block_path, max_path_size, "/sys/dev/block/%s", if (snprintf(block_path, max_path_size, "/sys/dev/block/%s",

View File

@ -141,8 +141,7 @@ int execute_regex_match(const char *regex_str, const char *input) {
regex_t regex; regex_t regex;
int regex_match; int regex_match;
if (0 != regcomp(&regex, regex_str, REG_EXTENDED|REG_NOSUB)) { if (0 != regcomp(&regex, regex_str, REG_EXTENDED|REG_NOSUB)) {
fprintf(LOGFILE, "Unable to compile regex."); fprintf(LOGFILE, "Unable to compile regex.\n");
fflush(LOGFILE);
exit(ERROR_COMPILING_REGEX); exit(ERROR_COMPILING_REGEX);
} }
regex_match = regexec(&regex, input, (size_t) 0, NULL, 0); regex_match = regexec(&regex, input, (size_t) 0, NULL, 0);

View File

@ -71,7 +71,8 @@ enum errorcodes {
INVALID_CONTAINER_ID = 43, INVALID_CONTAINER_ID = 43,
DOCKER_EXEC_FAILED = 44, DOCKER_EXEC_FAILED = 44,
COULD_NOT_CREATE_KEYSTORE_COPY = 45, COULD_NOT_CREATE_KEYSTORE_COPY = 45,
COULD_NOT_CREATE_TRUSTSTORE_COPY = 46 COULD_NOT_CREATE_TRUSTSTORE_COPY = 46,
ERROR_CALLING_SETVBUF = 47
}; };
/* Macros for min/max. */ /* Macros for min/max. */

View File

@ -318,7 +318,6 @@ static int validate_container_name(const char *container_name) {
} }
} }
fprintf(ERRORFILE, "Specified container_id=%s is invalid\n", container_name); fprintf(ERRORFILE, "Specified container_id=%s is invalid\n", container_name);
fflush(ERRORFILE);
return INVALID_DOCKER_CONTAINER_NAME; return INVALID_DOCKER_CONTAINER_NAME;
} }

View File

@ -1539,6 +1539,18 @@ int main(int argc, char **argv) {
LOGFILE = stdout; LOGFILE = stdout;
ERRORFILE = stderr; ERRORFILE = stderr;
if (setvbuf(LOGFILE, NULL, _IOLBF, BUFSIZ)) {
fprintf(LOGFILE, "Failed to invoke setvbuf() for LOGFILE: %s\n", strerror(errno));
fflush(LOGFILE);
exit(ERROR_CALLING_SETVBUF);
}
if (setvbuf(ERRORFILE, NULL, _IOLBF, BUFSIZ)) {
fprintf(ERRORFILE, "Failed to invoke setvbuf() for ERRORFILE: %s\n", strerror(errno));
fflush(ERRORFILE);
exit(ERROR_CALLING_SETVBUF);
}
nm_uid = getuid(); nm_uid = getuid();
printf("Attempting to clean up from any previous runs\n"); printf("Attempting to clean up from any previous runs\n");