diff --git a/hadoop-ozone/dist/src/main/blockade/blockadeUtils/blockade.py b/hadoop-ozone/dist/src/main/blockade/blockadeUtils/blockade.py index 37c275f5f7d..432562e6fa6 100644 --- a/hadoop-ozone/dist/src/main/blockade/blockadeUtils/blockade.py +++ b/hadoop-ozone/dist/src/main/blockade/blockadeUtils/blockade.py @@ -21,6 +21,7 @@ from subprocess import call import subprocess import logging import random +from clusterUtils.cluster_utils import ClusterUtils logger = logging.getLogger(__name__) @@ -31,10 +32,14 @@ class Blockade(object): def blockade_destroy(cls): call(["blockade", "destroy"]) + @classmethod + def blockade_up(cls): + call(["blockade", "up"]) + @classmethod def blockade_status(cls): - output = call(["blockade", "status"]) - return output + exit_code, output = ClusterUtils.run_cmd("blockade status") + return exit_code, output @classmethod def make_flaky(cls, flaky_node, container_list): @@ -57,3 +62,32 @@ class Blockade(object): def blockade_fast_all(cls): output = call(["blockade", "fast", "--all"]) assert output == 0, "fast command failed with exit code=[%s]" % output + + @classmethod + def blockade_create_partition(cls, *args): + nodes = "" + for node_list in args: + nodes = nodes + ','.join(node_list) + " " + exit_code, output = ClusterUtils.run_cmd("blockade partition %s" % nodes) + assert exit_code == 0, "blockade partition command failed with exit code=[%s]" % output + + @classmethod + def blockade_join(cls): + output = call(["blockade", "join"]) + assert output == 0, "blockade join command failed with exit code=[%s]" % output + + @classmethod + def blockade_stop(cls, node, all_nodes=False): + if all_nodes: + output = call(["blockade", "stop", "--all"]) + else: + output = call(["blockade", "stop", node]) + assert output == 0, "blockade stop command failed with exit code=[%s]" % output + + @classmethod + def blockade_start(cls, node, all_nodes=False): + if all_nodes: + output = call(["blockade", "start", "--all"]) + else: + output = call(["blockade", "start", node]) + assert output == 0, "blockade start command failed with exit code=[%s]" % output \ No newline at end of file diff --git a/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py b/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py index a45035b484c..26342c7e6ab 100644 --- a/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py +++ b/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py @@ -21,6 +21,8 @@ from subprocess import call import subprocess import logging import time +import re +import yaml logger = logging.getLogger(__name__) @@ -61,15 +63,101 @@ class ClusterUtils(object): def run_freon(cls, docker_compose_file, num_volumes, num_buckets, num_keys, key_size, replication_type, replication_factor): # run freon - logger.info("Running freon ...") - output = call(["docker-compose", "-f", docker_compose_file, - "exec", "ozoneManager", - "/opt/hadoop/bin/ozone", - "freon", "rk", - "--numOfVolumes", str(num_volumes), - "--numOfBuckets", str(num_buckets), - "--numOfKeys", str(num_keys), - "--keySize", str(key_size), - "--replicationType", replication_type, - "--factor", replication_factor]) - assert output == 0, "freon run failed with exit code=[%s]" % output + cmd = "docker-compose -f %s exec ozoneManager /opt/hadoop/bin/ozone freon rk " \ + "--numOfVolumes %s --numOfBuckets %s --numOfKeys %s --keySize %s " \ + "--replicationType %s --factor %s" % (docker_compose_file, num_volumes, + num_buckets, num_keys, key_size, replication_type, + replication_factor) + exit_code, output = cls.run_cmd(cmd) + return exit_code, output + + @classmethod + def run_cmd(cls, cmd): + command = cmd + if isinstance(cmd, list): + command = ' '.join(cmd) + logger.info(" RUNNING: " + command) + all_output = "" + myprocess = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) + while myprocess.poll() is None: + op = myprocess.stdout.readline() + if op: + all_output += op + logger.info(op) + other_output = myprocess.communicate() + other_output = other_output[0].strip() + if other_output != "": + all_output += other_output + for each_line in other_output.split("\n"): + logger.info(" " + each_line.strip()) + reg = re.compile(r"(\r\n|\n)$") + all_output = reg.sub("", all_output, 1) + + return myprocess.returncode, all_output + + @classmethod + def get_ozone_confkey_value(cls, docker_compose_file, key_name): + cmd = "docker-compose -f %s exec ozoneManager /opt/hadoop/bin/ozone getconf -confKey %s" \ + %(docker_compose_file, key_name) + exit_code, output = cls.run_cmd(cmd) + assert exit_code == 0, "getconf of key=[%s] failed with output=[%s]" %(key_name, output) + return output + + @classmethod + def find_scm_uuid(cls, docker_compose_file): + """ + This function returns scm uuid. + """ + ozone_metadata_dir = cls.get_ozone_confkey_value(docker_compose_file, "ozone.metadata.dirs") + cmd = "docker-compose -f %s exec scm cat %s/scm/current/VERSION" % (docker_compose_file, ozone_metadata_dir) + exit_code, output = cls.run_cmd(cmd) + assert exit_code == 0, "get scm UUID failed with output=[%s]" % output + output_list = output.split("\n") + output_list = list(filter(lambda x: re.search("\w+=\w+", x), output_list)) + output_dict = dict(map(lambda x: x.split("="), output_list)) + return str(output_dict['scmUuid']).strip() + + @classmethod + def find_datanode_container_status(cls, docker_compose_file, datanode_index): + """ + This function returns the datanode's container replica state. + """ + datanode_dir = cls.get_ozone_confkey_value(docker_compose_file, "hdds.datanode.dir") + scm_uuid = cls.find_scm_uuid(docker_compose_file) + container_parent_path = "%s/hdds/%s/current/containerDir0" %(datanode_dir, scm_uuid) + cmd = "docker-compose -f %s exec --index=%s datanode find %s -type f -name '*.container'" \ + % (docker_compose_file, datanode_index, container_parent_path) + exit_code, output = cls.run_cmd(cmd) + assert exit_code == 0, "command=[%s] failed with output=[%s]" % (cmd, output) + assert output, "No container info present" + container_list = map(str.strip, output.split("\n")) + container_state = None + for container_path in container_list: + cmd = "docker-compose -f %s exec --index=%s datanode cat %s" \ + % (docker_compose_file, datanode_index, container_path) + exit_code, output = cls.run_cmd(cmd) + assert exit_code == 0, "command=[%s] failed with output=[%s]" % (cmd, output) + container_db_list = output.split("\n") + container_db_list = list(filter(lambda x: re.search("\w+:\s\w+", x), container_db_list)) + container_db_info = "\n".join(container_db_list) + container_db_dict = yaml.load(container_db_info) + for key, value in container_db_dict.items(): + container_db_dict[key] = str(value).lstrip() + if not container_state: + container_state = container_db_dict['state'] + else: + assert container_db_dict['state'] == container_state, "all containers are not in same state" + + return container_state + + @classmethod + def find_all_datanodes_container_status(cls, docker_compose_file, scale): + """ + This function returns container replica states of all datanodes. + """ + all_datanode_container_status = [] + for index in range(scale): + all_datanode_container_status.append(cls.find_datanode_container_status(docker_compose_file, index+1)) + logger.info("All datanodes container status: %s", ' '.join(all_datanode_container_status)) + + return all_datanode_container_status \ No newline at end of file diff --git a/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py b/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py new file mode 100644 index 00000000000..cc2bddc5ff6 --- /dev/null +++ b/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py @@ -0,0 +1,91 @@ +#!/usr/bin/python + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This module has apis to create and remove a blockade cluster""" +import os +import time +import logging +from blockadeUtils.blockade import Blockade +from clusterUtils.cluster_utils import ClusterUtils + + +logger = logging.getLogger(__name__) +parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) +FILE = os.path.join(parent_dir, "compose", "ozone", "docker-compose.yaml") +SCALE = 3 +CONTAINER_LIST = [] +OM = [] +SCM = [] +DATANODES = [] + + +def setup(): + global CONTAINER_LIST, OM, SCM, DATANODES + Blockade.blockade_destroy() + CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE) + exit_code, output = Blockade.blockade_status() + assert exit_code == 0, "blockade status command failed with output=[%s]" % output + OM = filter(lambda x: 'ozoneManager' in x, CONTAINER_LIST) + SCM = filter(lambda x: 'scm' in x, CONTAINER_LIST) + DATANODES = sorted(list(filter(lambda x: 'datanode' in x, CONTAINER_LIST))) + + exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") + assert exit_code == 0, "freon run failed with output=[%s]" % output + + +def teardown(): + logger.info("Inside teardown") + Blockade.blockade_destroy() + + +def teardown_module(): + ClusterUtils.cluster_destroy(FILE) + + +def test_datanode_isolation_one_node(): + """ + In this test, one of the datanodes cannot communicate with other two datanodes. + All datanodes can communicate with SCM. + """ + first_set = [OM[0], SCM[0], DATANODES[0]] + second_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]] + Blockade.blockade_create_partition(first_set, second_set) + Blockade.blockade_status() + ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") + logger.info("Waiting for 480 seconds before checking container status") + time.sleep(480) + all_datanodes_container_status = ClusterUtils.find_all_datanodes_container_status(FILE, SCALE) + count_closed_container_datanodes = filter(lambda x: x == 'CLOSED', all_datanodes_container_status) + assert len(count_closed_container_datanodes) == 3, "The container should have three closed replicas." + + +def test_datanode_isolation_all(): + """ + In this test, none of the datanodes can communicate with other two datanodes. + All datanodes can communicate with SCM. + """ + first_set = [OM[0], SCM[0], DATANODES[0]] + second_set = [OM[0], SCM[0], DATANODES[1]] + third_set = [OM[0], SCM[0], DATANODES[2]] + Blockade.blockade_create_partition(first_set, second_set, third_set) + Blockade.blockade_status() + ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") + logger.info("Waiting for 480 seconds before checking container status") + time.sleep(480) + all_datanodes_container_status = ClusterUtils.find_all_datanodes_container_status(FILE, SCALE) + count_closed_container_datanodes = filter(lambda x: x == 'CLOSED', all_datanodes_container_status) + assert len(count_closed_container_datanodes) >= 2, "The container should have at least two closed replicas." \ No newline at end of file diff --git a/hadoop-ozone/dist/src/main/blockade/test_blockade.py b/hadoop-ozone/dist/src/main/blockade/test_blockade_flaky.py similarity index 82% rename from hadoop-ozone/dist/src/main/blockade/test_blockade.py rename to hadoop-ozone/dist/src/main/blockade/test_blockade_flaky.py index 6f0c1e63fb0..01aa4497ab8 100644 --- a/hadoop-ozone/dist/src/main/blockade/test_blockade.py +++ b/hadoop-ozone/dist/src/main/blockade/test_blockade_flaky.py @@ -17,6 +17,7 @@ """This module has apis to create and remove a blockade cluster""" import os +import time import logging import pytest from blockadeUtils.blockade import Blockade @@ -32,9 +33,10 @@ CONTAINER_LIST = [] def setup_module(): global CONTAINER_LIST + Blockade.blockade_destroy() CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE) - output = Blockade.blockade_status() - assert output == 0, "blockade status command failed with exit code=[%s]" % output + exit_code, output = Blockade.blockade_status() + assert exit_code == 0, "blockade status command failed with output=[%s]" % output def teardown_module(): @@ -45,10 +47,12 @@ def teardown_module(): def teardown(): logger.info("Inside teardown") Blockade.blockade_fast_all() + time.sleep(5) @pytest.mark.parametrize("flaky_nodes", ["datanode", "scm", "om", "all"]) def test_flaky(flaky_nodes): Blockade.make_flaky(flaky_nodes, CONTAINER_LIST) Blockade.blockade_status() - ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") + exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE") + assert exit_code == 0, "freon run failed with output=[%s]" % output \ No newline at end of file