From a7f5e742a6e2ce5e12d127fa71c6280f05ce54d3 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Fri, 15 Mar 2019 20:54:41 +0530 Subject: [PATCH] HDDS-1088. Add blockade Tests to test Replica Manager. Contributed by Nilotpal Nandi. --- hadoop-ozone/dev-support/docker/Dockerfile | 3 + hadoop-ozone/dist/src/main/blockade/README.md | 10 + .../blockade/clusterUtils/cluster_utils.py | 489 +++++++++--------- .../dist/src/main/blockade/conftest.py | 139 ++--- .../test_blockade_datanode_isolation.py | 149 ++++-- .../blockade/test_blockade_scm_isolation.py | 159 ++++-- 6 files changed, 536 insertions(+), 413 deletions(-) diff --git a/hadoop-ozone/dev-support/docker/Dockerfile b/hadoop-ozone/dev-support/docker/Dockerfile index a84367ee00f..045e1f6c058 100644 --- a/hadoop-ozone/dev-support/docker/Dockerfile +++ b/hadoop-ozone/dev-support/docker/Dockerfile @@ -46,6 +46,9 @@ RUN mkdir -p /opt && \ #Install docker-compose RUN pip install docker-compose +#Install pytest==2.8.7 +RUN pip install pytest==2.8.7 + ENV PATH=$PATH:/opt/findbugs/bin RUN addgroup -g 1000 default && \ diff --git a/hadoop-ozone/dist/src/main/blockade/README.md b/hadoop-ozone/dist/src/main/blockade/README.md index 9ece997d658..fb582054998 100644 --- a/hadoop-ozone/dist/src/main/blockade/README.md +++ b/hadoop-ozone/dist/src/main/blockade/README.md @@ -41,4 +41,14 @@ cd $DIRECTORY_OF_OZONE python -m pytest -s blockade/ --containerStatusSleep= e.g: python -m pytest -s blockade/ --containerStatusSleep=720 +``` + +By default, second phase of the tests will not be run. +In order to run the second phase of the tests, you can run following +command-lines: + +``` +cd $DIRECTORY_OF_OZONE +python -m pytest -s blockade/ --runSecondPhase=true + ``` \ No newline at end of file diff --git a/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py b/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py index bf0b28fd8c4..baa396093f4 100644 --- a/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py +++ b/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py @@ -21,280 +21,289 @@ import subprocess import logging import time import re -import yaml import os +import yaml logger = logging.getLogger(__name__) class ClusterUtils(object): + """ + This class contains all cluster related operations. + """ - @classmethod - def cluster_setup(cls, docker_compose_file, datanode_count): - """start a blockade cluster""" - logger.info("compose file :%s", docker_compose_file) - logger.info("number of DNs :%d", datanode_count) - call(["docker-compose", "-f", docker_compose_file, "down"]) - call(["docker-compose", "-f", docker_compose_file, "up", "-d", - "--scale", "datanode=" + str(datanode_count)]) + @classmethod + def cluster_setup(cls, docker_compose_file, datanode_count, + destroy_existing_cluster=True): + """start a blockade cluster""" + logger.info("compose file :%s", docker_compose_file) + logger.info("number of DNs :%d", datanode_count) + if destroy_existing_cluster: + call(["docker-compose", "-f", docker_compose_file, "down"]) + call(["docker-compose", "-f", docker_compose_file, "up", "-d", + "--scale", "datanode=" + str(datanode_count)]) - logger.info("Waiting 30s for cluster start up...") - time.sleep(30) - output = subprocess.check_output(["docker-compose", "-f", - docker_compose_file, "ps"]) - output_array = output.split("\n")[2:-1] + logger.info("Waiting 30s for cluster start up...") + time.sleep(30) + output = subprocess.check_output(["docker-compose", "-f", + docker_compose_file, "ps"]) + output_array = output.split("\n")[2:-1] - container_list = [] - for out in output_array: - container = out.split(" ")[0] - container_list.append(container) - call(["blockade", "add", container]) - time.sleep(2) + container_list = [] + for out in output_array: + container = out.split(" ")[0] + container_list.append(container) + call(["blockade", "add", container]) + time.sleep(2) - assert container_list, "no container found!" - logger.info("blockade created with containers %s", - ' '.join(container_list)) + assert container_list, "no container found!" + logger.info("blockade created with containers %s", + ' '.join(container_list)) - return container_list + return container_list - @classmethod - def cluster_destroy(cls, docker_compose_file): - logger.info("Running docker-compose -f %s down", docker_compose_file) - call(["docker-compose", "-f", docker_compose_file, "down"]) + @classmethod + def cluster_destroy(cls, docker_compose_file): + logger.info("Running docker-compose -f %s down", docker_compose_file) + call(["docker-compose", "-f", docker_compose_file, "down"]) - @classmethod - def run_freon(cls, docker_compose_file, num_volumes, num_buckets, - num_keys, key_size, replication_type, replication_factor, - freon_client='ozoneManager'): - # run freon - cmd = "docker-compose -f %s " \ - "exec %s /opt/hadoop/bin/ozone " \ - "freon rk " \ - "--numOfVolumes %s " \ - "--numOfBuckets %s " \ - "--numOfKeys %s " \ - "--keySize %s " \ - "--replicationType %s " \ - "--factor %s" % (docker_compose_file, freon_client, num_volumes, - num_buckets, num_keys, key_size, - replication_type, replication_factor) + @classmethod + def run_freon(cls, docker_compose_file, num_volumes, num_buckets, + num_keys, key_size, replication_type, replication_factor, + freon_client='ozoneManager'): + # run freon + cmd = "docker-compose -f %s " \ + "exec %s /opt/hadoop/bin/ozone " \ + "freon rk " \ + "--numOfVolumes %s " \ + "--numOfBuckets %s " \ + "--numOfKeys %s " \ + "--keySize %s " \ + "--replicationType %s " \ + "--factor %s" % (docker_compose_file, freon_client, num_volumes, + num_buckets, num_keys, key_size, + replication_type, replication_factor) + exit_code, output = cls.run_cmd(cmd) + return exit_code, output + + @classmethod + def run_cmd(cls, cmd): + command = cmd + if isinstance(cmd, list): + command = ' '.join(cmd) + logger.info(" RUNNING: %s", command) + all_output = "" + myprocess = subprocess.Popen(cmd, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, shell=True) + while myprocess.poll() is None: + op = myprocess.stdout.readline() + if op: + all_output += op + logger.info(op) + other_output = myprocess.communicate() + other_output = other_output[0].strip() + if other_output != "": + all_output += other_output + for each_line in other_output.split("\n"): + logger.info(" %s", each_line.strip()) + reg = re.compile(r"(\r\n|\n)$") + all_output = reg.sub("", all_output, 1) + + return myprocess.returncode, all_output + + @classmethod + def get_ozone_confkey_value(cls, docker_compose_file, key_name): + cmd = "docker-compose -f %s " \ + "exec ozoneManager /opt/hadoop/bin/ozone " \ + "getconf -confKey %s" \ + % (docker_compose_file, key_name) + exit_code, output = cls.run_cmd(cmd) + assert exit_code == 0, "getconf of key=[%s] failed with output=[%s]" \ + % (key_name, output) + return str(output).strip() + + @classmethod + def find_scm_uuid(cls, docker_compose_file): + """ + This function returns scm uuid. + """ + ozone_metadata_dir = cls.get_ozone_confkey_value(docker_compose_file, + "ozone.metadata.dirs") + cmd = "docker-compose -f %s exec scm cat %s/scm/current/VERSION" % \ + (docker_compose_file, ozone_metadata_dir) + exit_code, output = cls.run_cmd(cmd) + assert exit_code == 0, "get scm UUID failed with output=[%s]" % output + output_list = output.split("\n") + output_list = [x for x in output_list if re.search(r"\w+=\w+", x)] + output_dict = dict(x.split("=") for x in output_list) + return str(output_dict['scmUuid']).strip() + + @classmethod + def find_container_status(cls, docker_compose_file, datanode_index): + """ + This function returns the datanode's container replica state. + In this function, it finds all .container files. + Then, it opens each file and checks the state of the containers + in the datanode. + It returns 'None' as container state if it cannot find any + .container file in the datanode. + Sample .container contains state as following: + state: + """ + + datanode_dir = cls.get_ozone_confkey_value(docker_compose_file, + "hdds.datanode.dir") + scm_uuid = cls.find_scm_uuid(docker_compose_file) + container_parent_path = "%s/hdds/%s/current/containerDir0" % \ + (datanode_dir, scm_uuid) + cmd = "docker-compose -f %s exec --index=%s datanode find %s -type f " \ + "-name '*.container'" \ + % (docker_compose_file, datanode_index, container_parent_path) + exit_code, output = cls.run_cmd(cmd) + container_state = "None" + if exit_code == 0 and output: + container_list = map(str.strip, output.split("\n")) + for container_path in container_list: + cmd = "docker-compose -f %s exec --index=%s datanode cat %s" \ + % (docker_compose_file, datanode_index, container_path) exit_code, output = cls.run_cmd(cmd) - return exit_code, output + assert exit_code == 0, \ + "command=[%s] failed with output=[%s]" % (cmd, output) + container_db_list = output.split("\n") + container_db_list = [x for x in container_db_list + if re.search(r"\w+:\s\w+", x)] + # container_db_list will now contain the lines which has got + # yaml representation , i.e , key: value + container_db_info = "\n".join(container_db_list) + container_db_dict = yaml.load(container_db_info) + for key, value in container_db_dict.items(): + container_db_dict[key] = str(value).lstrip() + if container_state == "None": + container_state = container_db_dict['state'] + else: + assert container_db_dict['state'] == container_state, \ + "all containers are not in same state" - @classmethod - def run_cmd(cls, cmd): - command = cmd - if isinstance(cmd, list): - command = ' '.join(cmd) - logger.info(" RUNNING: " + command) - all_output = "" - myprocess = subprocess.Popen(cmd, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, shell=True) - while myprocess.poll() is None: - op = myprocess.stdout.readline() - if op: - all_output += op - logger.info(op) - other_output = myprocess.communicate() - other_output = other_output[0].strip() - if other_output != "": - all_output += other_output - for each_line in other_output.split("\n"): - logger.info(" " + each_line.strip()) - reg = re.compile(r"(\r\n|\n)$") - all_output = reg.sub("", all_output, 1) + return container_state - return myprocess.returncode, all_output + @classmethod + def findall_container_status(cls, docker_compose_file, scale): + """ + This function returns container replica states of all datanodes. + """ + all_datanode_container_status = [] + for index in range(scale): + all_datanode_container_status.append( + cls.find_container_status(docker_compose_file, index + 1)) + logger.info("All datanodes container status: %s", + ' '.join(all_datanode_container_status)) - @classmethod - def get_ozone_confkey_value(cls, docker_compose_file, key_name): - cmd = "docker-compose -f %s " \ - "exec ozoneManager /opt/hadoop/bin/ozone " \ - "getconf -confKey %s" \ - % (docker_compose_file, key_name) - exit_code, output = cls.run_cmd(cmd) - assert exit_code == 0, "getconf of key=[%s] failed with output=[%s]" \ - %(key_name, output) - return str(output).strip() + return all_datanode_container_status - @classmethod - def find_scm_uuid(cls, docker_compose_file): - """ - This function returns scm uuid. - """ - ozone_metadata_dir = cls.get_ozone_confkey_value(docker_compose_file, - "ozone.metadata.dirs") - cmd = "docker-compose -f %s exec scm cat %s/scm/current/VERSION" % \ - (docker_compose_file, ozone_metadata_dir) - exit_code, output = cls.run_cmd(cmd) - assert exit_code == 0, "get scm UUID failed with output=[%s]" % output - output_list = output.split("\n") - output_list = list(filter(lambda x: re.search("\w+=\w+", x), - output_list)) - output_dict = dict(map(lambda x: x.split("="), output_list)) - return str(output_dict['scmUuid']).strip() - - @classmethod - def find_datanode_container_status(cls, docker_compose_file, - datanode_index): - """ - This function returns the datanode's container replica state. - """ - datanode_dir = cls.get_ozone_confkey_value(docker_compose_file, - "hdds.datanode.dir") - scm_uuid = cls.find_scm_uuid(docker_compose_file) - container_parent_path = "%s/hdds/%s/current/containerDir0" % \ - (datanode_dir, scm_uuid) - cmd = "docker-compose -f %s exec --index=%s datanode find %s -type f " \ - "-name '*.container'" \ - % (docker_compose_file, datanode_index, container_parent_path) - exit_code, output = cls.run_cmd(cmd) - assert exit_code == 0, "command=[%s] failed with output=[%s]" % \ - (cmd, output) - assert output, "No container info present" - container_list = map(str.strip, output.split("\n")) - container_state = None - for container_path in container_list: - cmd = "docker-compose -f %s exec --index=%s datanode cat %s" \ - % (docker_compose_file, datanode_index, container_path) - exit_code, output = cls.run_cmd(cmd) - assert exit_code == 0, "command=[%s] failed with output=[%s]" % \ - (cmd, output) - container_db_list = output.split("\n") - container_db_list = \ - list(filter(lambda x: re.search("\w+:\s\w+", x), - container_db_list)) - container_db_info = "\n".join(container_db_list) - container_db_dict = yaml.load(container_db_info) - for key, value in container_db_dict.items(): - container_db_dict[key] = str(value).lstrip() - if not container_state: - container_state = container_db_dict['state'] - else: - assert container_db_dict['state'] == container_state, \ - "all containers are not in same state" - - return container_state - - @classmethod - def find_all_datanodes_container_status(cls, docker_compose_file, scale): - """ - This function returns container replica states of all datanodes. - """ - all_datanode_container_status = [] - for index in range(scale): - all_datanode_container_status.append( - cls.find_datanode_container_status(docker_compose_file, - index+1)) - logger.info("All datanodes container status: %s", - ' '.join(all_datanode_container_status)) - - return all_datanode_container_status - - @classmethod - def create_volume(cls, docker_compose_file, volume_name): - command = "docker-compose -f %s " \ + @classmethod + def create_volume(cls, docker_compose_file, volume_name): + command = "docker-compose -f %s " \ "exec ozone_client /opt/hadoop/bin/ozone " \ - "sh volume create /%s --user root" % \ - (docker_compose_file, volume_name) - logger.info("Creating Volume %s", volume_name) - exit_code, output = cls.run_cmd(command) - assert exit_code == 0, "Ozone volume create failed with output=[%s]" \ - % output + "sh volume create /%s --user root" % \ + (docker_compose_file, volume_name) + logger.info("Creating Volume %s", volume_name) + exit_code, output = cls.run_cmd(command) + assert exit_code == 0, "Ozone volume create failed with output=[%s]" \ + % output - @classmethod - def delete_volume(cls, docker_compose_file, volume_name): - command = "docker-compose -f %s " \ + @classmethod + def delete_volume(cls, docker_compose_file, volume_name): + command = "docker-compose -f %s " \ "exec ozone_client /opt/hadoop/bin/ozone " \ - "sh volume delete /%s" % (docker_compose_file, volume_name) - logger.info("Deleting Volume %s", volume_name) - exit_code, output = cls.run_cmd(command) - return exit_code, output + "sh volume delete /%s" % (docker_compose_file, volume_name) + logger.info("Deleting Volume %s", volume_name) + exit_code, output = cls.run_cmd(command) + return exit_code, output - @classmethod - def create_bucket(cls, docker_compose_file, bucket_name, volume_name): - command = "docker-compose -f %s " \ + @classmethod + def create_bucket(cls, docker_compose_file, bucket_name, volume_name): + command = "docker-compose -f %s " \ "exec ozone_client /opt/hadoop/bin/ozone " \ "sh bucket create /%s/%s" % (docker_compose_file, volume_name, bucket_name) - logger.info("Creating Bucket %s in volume %s", - bucket_name, volume_name) - exit_code, output = cls.run_cmd(command) - assert exit_code == 0, "Ozone bucket create failed with output=[%s]" \ - % output + logger.info("Creating Bucket %s in volume %s", + bucket_name, volume_name) + exit_code, output = cls.run_cmd(command) + assert exit_code == 0, "Ozone bucket create failed with output=[%s]" \ + % output - @classmethod - def delete_bucket(cls, docker_compose_file, bucket_name, volume_name): - command = "docker-compose -f %s " \ + @classmethod + def delete_bucket(cls, docker_compose_file, bucket_name, volume_name): + command = "docker-compose -f %s " \ "exec ozone_client /opt/hadoop/bin/ozone " \ - "sh bucket delete /%s/%s" % (docker_compose_file, - volume_name, bucket_name) - logger.info("Running delete bucket of %s/%s", volume_name, bucket_name) - exit_code, output = cls.run_cmd(command) - return exit_code, output + "sh bucket delete /%s/%s" % (docker_compose_file, + volume_name, bucket_name) + logger.info("Running delete bucket of %s/%s", volume_name, bucket_name) + exit_code, output = cls.run_cmd(command) + return exit_code, output - @classmethod - def put_key(cls, docker_compose_file, bucket_name, volume_name, - filepath, key_name=None, replication_factor=None): - command = "docker-compose -f %s " \ + @classmethod + def put_key(cls, docker_compose_file, bucket_name, volume_name, + filepath, key_name=None, replication_factor=None): + command = "docker-compose -f %s " \ "exec ozone_client ls %s" % (docker_compose_file, filepath) - exit_code, output = cls.run_cmd(command) - assert exit_code == 0, "%s does not exist" % filepath - if key_name is None: - key_name = os.path.basename(filepath) - command = "docker-compose -f %s " \ + exit_code, output = cls.run_cmd(command) + assert exit_code == 0, "%s does not exist" % filepath + if key_name is None: + key_name = os.path.basename(filepath) + command = "docker-compose -f %s " \ "exec ozone_client /opt/hadoop/bin/ozone " \ - "sh key put /%s/%s/%s %s" % (docker_compose_file, - volume_name, bucket_name, - key_name, filepath) - if replication_factor: - command = "%s --replication=%s" % (command, replication_factor) - logger.info("Creating key %s in %s/%s", key_name, - volume_name, bucket_name) - exit_code, output = cls.run_cmd(command) - assert exit_code == 0, "Ozone put Key failed with output=[%s]" % output + "sh key put /%s/%s/%s %s" % (docker_compose_file, + volume_name, bucket_name, + key_name, filepath) + if replication_factor: + command = "%s --replication=%s" % (command, replication_factor) + logger.info("Creating key %s in %s/%s", key_name, + volume_name, bucket_name) + exit_code, output = cls.run_cmd(command) + assert exit_code == 0, "Ozone put Key failed with output=[%s]" % output - @classmethod - def delete_key(cls, docker_compose_file, bucket_name, volume_name, - key_name): - command = "docker-compose -f %s " \ + @classmethod + def delete_key(cls, docker_compose_file, bucket_name, volume_name, + key_name): + command = "docker-compose -f %s " \ "exec ozone_client /opt/hadoop/bin/ozone " \ - "sh key delete /%s/%s/%s" \ - % (docker_compose_file, volume_name, bucket_name, key_name) - logger.info("Running delete key %s in %s/%s", - key_name, volume_name, bucket_name) - exit_code, output = cls.run_cmd(command) - return exit_code, output + "sh key delete /%s/%s/%s" \ + % (docker_compose_file, volume_name, bucket_name, key_name) + logger.info("Running delete key %s in %s/%s", + key_name, volume_name, bucket_name) + exit_code, output = cls.run_cmd(command) + return exit_code, output - @classmethod - def get_key(cls, docker_compose_file, bucket_name, volume_name, - key_name, filepath=None): - if filepath is None: - filepath = '.' - command = "docker-compose -f %s " \ + @classmethod + def get_key(cls, docker_compose_file, bucket_name, volume_name, + key_name, filepath=None): + if filepath is None: + filepath = '.' + command = "docker-compose -f %s " \ "exec ozone_client /opt/hadoop/bin/ozone " \ - "sh key get /%s/%s/%s %s" % (docker_compose_file, - volume_name, bucket_name, - key_name, filepath) - logger.info("Running get key %s in %s/%s", key_name, - volume_name, bucket_name) - exit_code, output = cls.run_cmd(command) - assert exit_code == 0, "Ozone get Key failed with output=[%s]" % output + "sh key get /%s/%s/%s %s" % (docker_compose_file, + volume_name, bucket_name, + key_name, filepath) + logger.info("Running get key %s in %s/%s", key_name, + volume_name, bucket_name) + exit_code, output = cls.run_cmd(command) + assert exit_code == 0, "Ozone get Key failed with output=[%s]" % output - @classmethod - def find_checksum(cls, docker_compose_file, filepath): - command = "docker-compose -f %s " \ + @classmethod + def find_checksum(cls, docker_compose_file, filepath): + command = "docker-compose -f %s " \ "exec ozone_client md5sum %s" % (docker_compose_file, filepath) - exit_code, output = cls.run_cmd(command) - assert exit_code == 0, "Cant find checksum" - myoutput = output.split("\n") - finaloutput = "" - for line in myoutput: - if line.find("Warning") >= 0 or line.find("is not a tty") >= 0: - logger.info("skip this line: %s", line) - else: - finaloutput = finaloutput + line - checksum = finaloutput.split(" ") - logger.info("Checksum of %s is : %s", filepath, checksum[0]) - return checksum[0] \ No newline at end of file + exit_code, output = cls.run_cmd(command) + assert exit_code == 0, "Cant find checksum" + myoutput = output.split("\n") + finaloutput = "" + for line in myoutput: + if line.find("Warning") >= 0 or line.find("is not a tty") >= 0: + logger.info("skip this line: %s", line) + else: + finaloutput = finaloutput + line + checksum = finaloutput.split(" ") + logger.info("Checksum of %s is : %s", filepath, checksum[0]) + return checksum[0] diff --git a/hadoop-ozone/dist/src/main/blockade/conftest.py b/hadoop-ozone/dist/src/main/blockade/conftest.py index ff5bfc7fc0c..582c4cc9405 100644 --- a/hadoop-ozone/dist/src/main/blockade/conftest.py +++ b/hadoop-ozone/dist/src/main/blockade/conftest.py @@ -17,80 +17,97 @@ import logging import os import time import subprocess +import pytest EPOCH_TIME = int(time.time()) + + def pytest_addoption(parser): - parser.addoption("--output-dir", - action="store", - default="/tmp/BlockadeTests", - help="location of output directory where output log " - "and plot files will be created") - parser.addoption("--log-format", - action="store", - default="%(asctime)s|%(levelname)s|%(threadName)s|" - "%(filename)s:%(lineno)s -" - " %(funcName)s()|%(message)s", - help="specify log format") - parser.addoption("--log-level", - action="store", - default="info", - help="specify log level") - parser.addoption("--containerStatusSleep", - action="store", - default="900", - help="sleep time before checking container status") + parser.addoption("--output-dir", + action="store", + default="/tmp/BlockadeTests", + help="location of output directory where output log " + "and plot files will be created") + parser.addoption("--log-format", + action="store", + default="%(asctime)s|%(levelname)s|%(threadName)s|" + "%(filename)s:%(lineno)s -" + " %(funcName)s()|%(message)s", + help="specify log format") + parser.addoption("--log-level", + action="store", + default="info", + help="specify log level") + parser.addoption("--containerStatusSleep", + action="store", + default="900", + help="sleep time before checking container status") + parser.addoption("--runSecondPhase", + action="store", + default="false", + help="run second phase of the tests") + + +@pytest.fixture +def run_second_phase(request): + """ + :param request: + This function returns if the user has opted for running second phase + of the tests. + """ + return request.config.getoption("--runSecondPhase") def pytest_configure(config): - global OUTPUT_DIR - os.environ["CONTAINER_STATUS_SLEEP"] = config.option.containerStatusSleep - OUTPUT_DIR = "%s/%s" % (config.option.output_dir, EPOCH_TIME) - try: - os.makedirs(OUTPUT_DIR) - except OSError, e: - raise Exception(e.strerror + ": " + e.filename) - log_file = os.path.join(OUTPUT_DIR, "output.log") + global OUTPUT_DIR + os.environ["CONTAINER_STATUS_SLEEP"] = config.option.containerStatusSleep + OUTPUT_DIR = "%s/%s" % (config.option.output_dir, EPOCH_TIME) + try: + os.makedirs(OUTPUT_DIR) + except OSError, e: + raise Exception(e.strerror + ": " + e.filename) + log_file = os.path.join(OUTPUT_DIR, "output.log") - if config.option.log_level == "trace": - loglevel = eval("logging.DEBUG") - else: - loglevel = eval("logging." + config.option.log_level.upper()) - logformatter = logging.Formatter(config.option.log_format) - logging.basicConfig(filename=log_file, - filemode='w', - level=loglevel, - format=config.option.log_format) - console = logging.StreamHandler() - console.setLevel(loglevel) - console.setFormatter(logformatter) - logging.getLogger('').addHandler(console) + if config.option.log_level == "trace": + loglevel = eval("logging.DEBUG") + else: + loglevel = eval("logging." + config.option.log_level.upper()) + logformatter = logging.Formatter(config.option.log_format) + logging.basicConfig(filename=log_file, + filemode='w', + level=loglevel, + format=config.option.log_format) + console = logging.StreamHandler() + console.setLevel(loglevel) + console.setFormatter(logformatter) + logging.getLogger('').addHandler(console) def pytest_report_teststatus(report): - logger = logging.getLogger('main') - loc, line, name = report.location - if report.outcome == 'skipped': - pass - elif report.when == 'setup': - logger.info("RUNNING TEST \"%s\" at location \"%s\" at line number" - " \"%s\"" % (name, loc, str(line))) - elif report.when == 'call': - logger.info("TEST \"%s\" %s in %3.2f seconds" % - (name, report.outcome.upper(), report.duration)) - log_file_path = "%s/%s_all_docker.log" % \ - (OUTPUT_DIR, name) - gather_docker_logs(log_file_path) + logger = logging.getLogger('main') + loc, line, name = report.location + if report.outcome == 'skipped': + pass + elif report.when == 'setup': + logger.info("RUNNING TEST \"%s\" at location \"%s\" at line number" + " \"%s\"" % (name, loc, str(line))) + elif report.when == 'call': + logger.info("TEST \"%s\" %s in %3.2f seconds" % + (name, report.outcome.upper(), report.duration)) + log_file_path = "%s/%s_all_docker.log" % \ + (OUTPUT_DIR, name) + gather_docker_logs(log_file_path) def pytest_sessionfinish(session): - logger = logging.getLogger('main') - logger.info("ALL TESTS FINISHED") - logger.info("ALL logs present in following directory: %s", OUTPUT_DIR) + logger = logging.getLogger('main') + logger.info("ALL TESTS FINISHED") + logger.info("ALL logs present in following directory: %s", OUTPUT_DIR) def gather_docker_logs(log_file_path): - docker_compose_file = os.environ["DOCKER_COMPOSE_FILE"] - output = subprocess.check_output(["docker-compose", "-f", - docker_compose_file, "logs"]) - with open(log_file_path, "w") as text_file: - text_file.write(output) + docker_compose_file = os.environ["DOCKER_COMPOSE_FILE"] + output = subprocess.check_output(["docker-compose", "-f", + docker_compose_file, "logs"]) + with open(log_file_path, "w") as text_file: + text_file.write(output) diff --git a/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py b/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py index becc635494f..fecd9d1009e 100644 --- a/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py +++ b/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py @@ -17,17 +17,18 @@ import os import time +import re import logging from blockadeUtils.blockade import Blockade from clusterUtils.cluster_utils import ClusterUtils - logger = logging.getLogger(__name__) parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) FILE = os.path.join(parent_dir, "compose", "ozoneblockade", "docker-compose.yaml") os.environ["DOCKER_COMPOSE_FILE"] = FILE SCALE = 3 +INCREASED_SCALE = 5 CONTAINER_LIST = [] OM = [] SCM = [] @@ -35,77 +36,113 @@ DATANODES = [] def setup(): - global CONTAINER_LIST, OM, SCM, DATANODES - Blockade.blockade_destroy() - CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE) - exit_code, output = Blockade.blockade_status() - assert exit_code == 0, "blockade status command failed with output=[%s]" % \ - output - OM = filter(lambda x: 'ozoneManager' in x, CONTAINER_LIST) - SCM = filter(lambda x: 'scm' in x, CONTAINER_LIST) - DATANODES = sorted(list(filter(lambda x: 'datanode' in x, CONTAINER_LIST))) + global CONTAINER_LIST, OM, SCM, DATANODES + Blockade.blockade_destroy() + CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE) + exit_code, output = Blockade.blockade_status() + assert exit_code == 0, "blockade status command failed with output=[%s]" % \ + output + OM = [x for x in CONTAINER_LIST if 'ozoneManager' in x] + SCM = [x for x in CONTAINER_LIST if 'scm' in x] + DATANODES = sorted(x for x in CONTAINER_LIST if 'datanode' in x) - exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", - "THREE") - assert exit_code == 0, "freon run failed with output=[%s]" % output + exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", + "THREE") + assert exit_code == 0, "freon run failed with output=[%s]" % output def teardown(): - logger.info("Inside teardown") - Blockade.blockade_destroy() + logger.info("Inside teardown") + Blockade.blockade_destroy() def teardown_module(): - ClusterUtils.cluster_destroy(FILE) + ClusterUtils.cluster_destroy(FILE) -def test_datanode_isolation_one_node(): - """ - In this test, one of the datanodes (first datanode) cannot communicate - with other two datanodes. - All datanodes can communicate with SCM. - Expectation : - The container replica state in first datanode should be quasi-closed. - The container replica state in other datanodes should be closed. - """ - first_set = [OM[0], SCM[0], DATANODES[0]] - second_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]] - Blockade.blockade_create_partition(first_set, second_set) +def test_isolatedatanode_singlenode(run_second_phase): + """ + In this test, one of the datanodes (first datanode) cannot communicate + with other two datanodes. + All datanodes can communicate with SCM. + Expectation : + The container replica state in first datanode should be quasi-closed. + The container replica state in other datanodes should be closed. + """ + first_set = [OM[0], SCM[0], DATANODES[0]] + second_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]] + Blockade.blockade_create_partition(first_set, second_set) + Blockade.blockade_status() + ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") + logger.info("Waiting for %s seconds before checking container status", + os.environ["CONTAINER_STATUS_SLEEP"]) + time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) + all_datanodes_container_status = \ + ClusterUtils.findall_container_status(FILE, SCALE) + first_datanode_status = all_datanodes_container_status[0] + closed_container_datanodes = [x for x in all_datanodes_container_status + if x == 'CLOSED'] + assert first_datanode_status == 'QUASI_CLOSED' + assert len(closed_container_datanodes) == 2, \ + "The container should have two closed replicas." + + if str(run_second_phase).lower() == "true": + ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False) Blockade.blockade_status() - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") logger.info("Waiting for %s seconds before checking container status", os.environ["CONTAINER_STATUS_SLEEP"]) time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) all_datanodes_container_status = \ - ClusterUtils.find_all_datanodes_container_status(FILE, SCALE) - first_datanode_status = all_datanodes_container_status[0] - count_closed_container_datanodes = filter(lambda x: x == 'CLOSED', - all_datanodes_container_status) - assert first_datanode_status == 'QUASI_CLOSED' - assert len(count_closed_container_datanodes) == 2, \ - "The container should have three closed replicas." - - -def test_datanode_isolation_all(): - """ - In this test, none of the datanodes can communicate with other two - datanodes. - All datanodes can communicate with SCM. - Expectation : The container should eventually have at least two closed - replicas. - """ - first_set = [OM[0], SCM[0], DATANODES[0]] - second_set = [OM[0], SCM[0], DATANODES[1]] - third_set = [OM[0], SCM[0], DATANODES[2]] - Blockade.blockade_create_partition(first_set, second_set, third_set) + ClusterUtils.findall_container_status(FILE, INCREASED_SCALE) + closed_container_datanodes = [x for x in all_datanodes_container_status + if x == 'CLOSED'] + assert len(closed_container_datanodes) >= 3, \ + "The container should have at least three closed replicas." + Blockade.blockade_join() + Blockade.blockade_status() + _, output = \ + ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") + assert re.search("Status: Success", output) is not None + + +def test_datanode_isolation_all(run_second_phase): + """ + In this test, none of the datanodes can communicate with other two + datanodes. + All datanodes can communicate with SCM. + Expectation : The container should eventually have at least two closed + replicas. + """ + first_set = [OM[0], SCM[0], DATANODES[0]] + second_set = [OM[0], SCM[0], DATANODES[1]] + third_set = [OM[0], SCM[0], DATANODES[2]] + Blockade.blockade_create_partition(first_set, second_set, third_set) + Blockade.blockade_status() + ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") + logger.info("Waiting for %s seconds before checking container status", + os.environ["CONTAINER_STATUS_SLEEP"]) + time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) + all_datanodes_container_status = \ + ClusterUtils.findall_container_status(FILE, SCALE) + closed_container_datanodes = [x for x in all_datanodes_container_status + if x == 'CLOSED'] + assert len(closed_container_datanodes) >= 2, \ + "The container should have at least two closed replicas." + + if str(run_second_phase).lower() == "true": + ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False) Blockade.blockade_status() - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") logger.info("Waiting for %s seconds before checking container status", os.environ["CONTAINER_STATUS_SLEEP"]) time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) all_datanodes_container_status = \ - ClusterUtils.find_all_datanodes_container_status(FILE, SCALE) - count_closed_container_datanodes = filter(lambda x: x == 'CLOSED', - all_datanodes_container_status) - assert len(count_closed_container_datanodes) >= 2, \ - "The container should have at least two closed replicas." \ No newline at end of file + ClusterUtils.findall_container_status(FILE, INCREASED_SCALE) + closed_container_datanodes = [x for x in all_datanodes_container_status + if x == 'CLOSED'] + assert len(closed_container_datanodes) >= 3, \ + "The container should have at least three closed replicas." + Blockade.blockade_join() + Blockade.blockade_status() + _, output = \ + ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") + assert re.search("Status: Success", output) is not None \ No newline at end of file diff --git a/hadoop-ozone/dist/src/main/blockade/test_blockade_scm_isolation.py b/hadoop-ozone/dist/src/main/blockade/test_blockade_scm_isolation.py index d2dd29a5e01..54c31e8dd5f 100644 --- a/hadoop-ozone/dist/src/main/blockade/test_blockade_scm_isolation.py +++ b/hadoop-ozone/dist/src/main/blockade/test_blockade_scm_isolation.py @@ -17,17 +17,18 @@ import os import time +import re import logging from blockadeUtils.blockade import Blockade from clusterUtils.cluster_utils import ClusterUtils - logger = logging.getLogger(__name__) parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) FILE = os.path.join(parent_dir, "compose", "ozoneblockade", "docker-compose.yaml") os.environ["DOCKER_COMPOSE_FILE"] = FILE SCALE = 3 +INCREASED_SCALE = 5 CONTAINER_LIST = [] OM = [] SCM = [] @@ -35,77 +36,123 @@ DATANODES = [] def setup(): - global CONTAINER_LIST, OM, SCM, DATANODES - Blockade.blockade_destroy() - CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE) - exit_code, output = Blockade.blockade_status() - assert exit_code == 0, "blockade status command failed with output=[%s]" % \ - output - OM = filter(lambda x: 'ozoneManager' in x, CONTAINER_LIST) - SCM = filter(lambda x: 'scm' in x, CONTAINER_LIST) - DATANODES = sorted(list(filter(lambda x: 'datanode' in x, CONTAINER_LIST))) + global CONTAINER_LIST, OM, SCM, DATANODES + Blockade.blockade_destroy() + CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE) + exit_code, output = Blockade.blockade_status() + assert exit_code == 0, "blockade status command failed with output=[%s]" % \ + output + OM = [x for x in CONTAINER_LIST if 'ozoneManager' in x] + SCM = [x for x in CONTAINER_LIST if 'scm' in x] + DATANODES = sorted(x for x in CONTAINER_LIST if 'datanode' in x) - exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", - "THREE") - assert exit_code == 0, "freon run failed with output=[%s]" % output + exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", + "THREE") + assert exit_code == 0, "freon run failed with output=[%s]" % output def teardown(): - logger.info("Inside teardown") - Blockade.blockade_destroy() + logger.info("Inside teardown") + Blockade.blockade_destroy() def teardown_module(): - ClusterUtils.cluster_destroy(FILE) + ClusterUtils.cluster_destroy(FILE) -def test_scm_isolation_one_node(): - """ - In this test, one of the datanodes cannot communicate with SCM. - Other datanodes can communicate with SCM. - Expectation : The container should eventually have at least two closed - replicas. - """ - first_set = [OM[0], DATANODES[0], DATANODES[1], DATANODES[2]] - second_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]] - Blockade.blockade_create_partition(first_set, second_set) +def test_scm_isolation_one_node(run_second_phase): + """ + In this test, one of the datanodes cannot communicate with SCM. + Other datanodes can communicate with SCM. + Expectation : The container should eventually have at least two closed + replicas. + """ + first_set = [OM[0], DATANODES[0], DATANODES[1], DATANODES[2]] + second_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]] + Blockade.blockade_create_partition(first_set, second_set) + Blockade.blockade_status() + ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") + logger.info("Waiting for %s seconds before checking container status", + os.environ["CONTAINER_STATUS_SLEEP"]) + time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) + all_datanodes_container_status = \ + ClusterUtils.findall_container_status(FILE, SCALE) + closed_container_datanodes = [x for x in all_datanodes_container_status + if x == 'CLOSED'] + assert len(closed_container_datanodes) >= 2, \ + "The container should have at least two closed replicas." + + if str(run_second_phase).lower() == "true": + ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False) Blockade.blockade_status() - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") logger.info("Waiting for %s seconds before checking container status", os.environ["CONTAINER_STATUS_SLEEP"]) time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) all_datanodes_container_status = \ - ClusterUtils.find_all_datanodes_container_status(FILE, SCALE) - count_closed_container_datanodes = filter(lambda x: x == 'CLOSED', - all_datanodes_container_status) - assert len(count_closed_container_datanodes) >= 2, \ - "The container should have at least two closed replicas." - - -def test_scm_isolation_two_node(): - """ - In this test, two datanodes cannot communicate with SCM. - Expectation : The container should eventually have at three closed replicas - or, two open replicas and one quasi-closed replica. - """ - first_set = [OM[0], DATANODES[0], DATANODES[1], DATANODES[2]] - second_set = [OM[0], SCM[0], DATANODES[1]] - Blockade.blockade_create_partition(first_set, second_set) + ClusterUtils.findall_container_status(FILE, INCREASED_SCALE) + closed_container_datanodes = [x for x in all_datanodes_container_status + if x == 'CLOSED'] + assert len(closed_container_datanodes) >= 3, \ + "The container should have at least three closed replicas." + Blockade.blockade_join() + Blockade.blockade_status() + _, output = \ + ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") + assert re.search("Status: Success", output) is not None + + +def test_scm_isolation_two_node(run_second_phase): + """ + In this test, two datanodes cannot communicate with SCM. + Expectation : The container should eventually have at three closed replicas + or, two open replicas and one quasi-closed replica. + """ + first_set = [OM[0], DATANODES[0], DATANODES[1], DATANODES[2]] + second_set = [OM[0], SCM[0], DATANODES[1]] + Blockade.blockade_create_partition(first_set, second_set) + Blockade.blockade_status() + ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") + logger.info("Waiting for %s seconds before checking container status", + os.environ["CONTAINER_STATUS_SLEEP"]) + time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) + all_datanodes_container_status = \ + ClusterUtils.findall_container_status(FILE, SCALE) + closed_container_datanodes = [x for x in all_datanodes_container_status + if x == 'CLOSED'] + qausiclosed_container_datanodes = [x for x in all_datanodes_container_status + if x == 'QUASI_CLOSED'] + count_open_container_datanodes = [x for x in all_datanodes_container_status + if x == 'OPEN'] + assert len(closed_container_datanodes) == 3 or \ + (len(count_open_container_datanodes) == 2 and + len(qausiclosed_container_datanodes) == 1), \ + "The container should have three closed replicas or two open " \ + "replicas and one quasi_closed replica." + + if str(run_second_phase).lower() == "true": + ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False) Blockade.blockade_status() - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") logger.info("Waiting for %s seconds before checking container status", os.environ["CONTAINER_STATUS_SLEEP"]) time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) all_datanodes_container_status = \ - ClusterUtils.find_all_datanodes_container_status(FILE, SCALE) - count_closed_container_datanodes = filter(lambda x: x == 'CLOSED', - all_datanodes_container_status) - count_qausi_closed_container_datanodes = \ - filter(lambda x: x == 'QUASI_CLOSED', all_datanodes_container_status) - count_open_container_datanodes = filter(lambda x: x == 'OPEN', - all_datanodes_container_status) - assert len(count_closed_container_datanodes) == 3 or \ - (len(count_open_container_datanodes) == 2 and - len(count_qausi_closed_container_datanodes) == 1), \ - "The container should have three closed replicas or two open " \ - "replicas and one quasi_closed replica." \ No newline at end of file + ClusterUtils.findall_container_status(FILE, INCREASED_SCALE) + closed_container_datanodes = [x for x in all_datanodes_container_status + if x == 'CLOSED'] + qausiclosed_container_datanodes = \ + [x for x in all_datanodes_container_status if x == 'QUASI_CLOSED'] + assert len(closed_container_datanodes) >= 3 or \ + len(qausiclosed_container_datanodes) >= 3 + Blockade.blockade_join() + Blockade.blockade_status() + if len(closed_container_datanodes) < 3: + time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"])) + all_datanodes_container_status = \ + ClusterUtils.findall_container_status(FILE, INCREASED_SCALE) + closed_container_datanodes = [x for x in all_datanodes_container_status + if x == 'CLOSED'] + + assert len(closed_container_datanodes) >= 3 + _, output = \ + ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") + assert re.search("Status: Success", output) is not None \ No newline at end of file