HDDS-932. Add blockade Tests for Network partition. Contributed by Nilotpal Nandi.
This commit is contained in:
parent
e3e076dc78
commit
e72e27edd8
|
@ -21,6 +21,7 @@ from subprocess import call
|
|||
import subprocess
|
||||
import logging
|
||||
import random
|
||||
from clusterUtils.cluster_utils import ClusterUtils
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -31,10 +32,14 @@ class Blockade(object):
|
|||
def blockade_destroy(cls):
|
||||
call(["blockade", "destroy"])
|
||||
|
||||
@classmethod
|
||||
def blockade_up(cls):
|
||||
call(["blockade", "up"])
|
||||
|
||||
@classmethod
|
||||
def blockade_status(cls):
|
||||
output = call(["blockade", "status"])
|
||||
return output
|
||||
exit_code, output = ClusterUtils.run_cmd("blockade status")
|
||||
return exit_code, output
|
||||
|
||||
@classmethod
|
||||
def make_flaky(cls, flaky_node, container_list):
|
||||
|
@ -57,3 +62,32 @@ class Blockade(object):
|
|||
def blockade_fast_all(cls):
|
||||
output = call(["blockade", "fast", "--all"])
|
||||
assert output == 0, "fast command failed with exit code=[%s]" % output
|
||||
|
||||
@classmethod
|
||||
def blockade_create_partition(cls, *args):
|
||||
nodes = ""
|
||||
for node_list in args:
|
||||
nodes = nodes + ','.join(node_list) + " "
|
||||
exit_code, output = ClusterUtils.run_cmd("blockade partition %s" % nodes)
|
||||
assert exit_code == 0, "blockade partition command failed with exit code=[%s]" % output
|
||||
|
||||
@classmethod
|
||||
def blockade_join(cls):
|
||||
output = call(["blockade", "join"])
|
||||
assert output == 0, "blockade join command failed with exit code=[%s]" % output
|
||||
|
||||
@classmethod
|
||||
def blockade_stop(cls, node, all_nodes=False):
|
||||
if all_nodes:
|
||||
output = call(["blockade", "stop", "--all"])
|
||||
else:
|
||||
output = call(["blockade", "stop", node])
|
||||
assert output == 0, "blockade stop command failed with exit code=[%s]" % output
|
||||
|
||||
@classmethod
|
||||
def blockade_start(cls, node, all_nodes=False):
|
||||
if all_nodes:
|
||||
output = call(["blockade", "start", "--all"])
|
||||
else:
|
||||
output = call(["blockade", "start", node])
|
||||
assert output == 0, "blockade start command failed with exit code=[%s]" % output
|
|
@ -21,6 +21,8 @@ from subprocess import call
|
|||
import subprocess
|
||||
import logging
|
||||
import time
|
||||
import re
|
||||
import yaml
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -61,15 +63,101 @@ class ClusterUtils(object):
|
|||
def run_freon(cls, docker_compose_file, num_volumes, num_buckets, num_keys, key_size,
|
||||
replication_type, replication_factor):
|
||||
# run freon
|
||||
logger.info("Running freon ...")
|
||||
output = call(["docker-compose", "-f", docker_compose_file,
|
||||
"exec", "ozoneManager",
|
||||
"/opt/hadoop/bin/ozone",
|
||||
"freon", "rk",
|
||||
"--numOfVolumes", str(num_volumes),
|
||||
"--numOfBuckets", str(num_buckets),
|
||||
"--numOfKeys", str(num_keys),
|
||||
"--keySize", str(key_size),
|
||||
"--replicationType", replication_type,
|
||||
"--factor", replication_factor])
|
||||
assert output == 0, "freon run failed with exit code=[%s]" % output
|
||||
cmd = "docker-compose -f %s exec ozoneManager /opt/hadoop/bin/ozone freon rk " \
|
||||
"--numOfVolumes %s --numOfBuckets %s --numOfKeys %s --keySize %s " \
|
||||
"--replicationType %s --factor %s" % (docker_compose_file, num_volumes,
|
||||
num_buckets, num_keys, key_size, replication_type,
|
||||
replication_factor)
|
||||
exit_code, output = cls.run_cmd(cmd)
|
||||
return exit_code, output
|
||||
|
||||
@classmethod
|
||||
def run_cmd(cls, cmd):
|
||||
command = cmd
|
||||
if isinstance(cmd, list):
|
||||
command = ' '.join(cmd)
|
||||
logger.info(" RUNNING: " + command)
|
||||
all_output = ""
|
||||
myprocess = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
|
||||
while myprocess.poll() is None:
|
||||
op = myprocess.stdout.readline()
|
||||
if op:
|
||||
all_output += op
|
||||
logger.info(op)
|
||||
other_output = myprocess.communicate()
|
||||
other_output = other_output[0].strip()
|
||||
if other_output != "":
|
||||
all_output += other_output
|
||||
for each_line in other_output.split("\n"):
|
||||
logger.info(" " + each_line.strip())
|
||||
reg = re.compile(r"(\r\n|\n)$")
|
||||
all_output = reg.sub("", all_output, 1)
|
||||
|
||||
return myprocess.returncode, all_output
|
||||
|
||||
@classmethod
|
||||
def get_ozone_confkey_value(cls, docker_compose_file, key_name):
|
||||
cmd = "docker-compose -f %s exec ozoneManager /opt/hadoop/bin/ozone getconf -confKey %s" \
|
||||
%(docker_compose_file, key_name)
|
||||
exit_code, output = cls.run_cmd(cmd)
|
||||
assert exit_code == 0, "getconf of key=[%s] failed with output=[%s]" %(key_name, output)
|
||||
return output
|
||||
|
||||
@classmethod
|
||||
def find_scm_uuid(cls, docker_compose_file):
|
||||
"""
|
||||
This function returns scm uuid.
|
||||
"""
|
||||
ozone_metadata_dir = cls.get_ozone_confkey_value(docker_compose_file, "ozone.metadata.dirs")
|
||||
cmd = "docker-compose -f %s exec scm cat %s/scm/current/VERSION" % (docker_compose_file, ozone_metadata_dir)
|
||||
exit_code, output = cls.run_cmd(cmd)
|
||||
assert exit_code == 0, "get scm UUID failed with output=[%s]" % output
|
||||
output_list = output.split("\n")
|
||||
output_list = list(filter(lambda x: re.search("\w+=\w+", x), output_list))
|
||||
output_dict = dict(map(lambda x: x.split("="), output_list))
|
||||
return str(output_dict['scmUuid']).strip()
|
||||
|
||||
@classmethod
|
||||
def find_datanode_container_status(cls, docker_compose_file, datanode_index):
|
||||
"""
|
||||
This function returns the datanode's container replica state.
|
||||
"""
|
||||
datanode_dir = cls.get_ozone_confkey_value(docker_compose_file, "hdds.datanode.dir")
|
||||
scm_uuid = cls.find_scm_uuid(docker_compose_file)
|
||||
container_parent_path = "%s/hdds/%s/current/containerDir0" %(datanode_dir, scm_uuid)
|
||||
cmd = "docker-compose -f %s exec --index=%s datanode find %s -type f -name '*.container'" \
|
||||
% (docker_compose_file, datanode_index, container_parent_path)
|
||||
exit_code, output = cls.run_cmd(cmd)
|
||||
assert exit_code == 0, "command=[%s] failed with output=[%s]" % (cmd, output)
|
||||
assert output, "No container info present"
|
||||
container_list = map(str.strip, output.split("\n"))
|
||||
container_state = None
|
||||
for container_path in container_list:
|
||||
cmd = "docker-compose -f %s exec --index=%s datanode cat %s" \
|
||||
% (docker_compose_file, datanode_index, container_path)
|
||||
exit_code, output = cls.run_cmd(cmd)
|
||||
assert exit_code == 0, "command=[%s] failed with output=[%s]" % (cmd, output)
|
||||
container_db_list = output.split("\n")
|
||||
container_db_list = list(filter(lambda x: re.search("\w+:\s\w+", x), container_db_list))
|
||||
container_db_info = "\n".join(container_db_list)
|
||||
container_db_dict = yaml.load(container_db_info)
|
||||
for key, value in container_db_dict.items():
|
||||
container_db_dict[key] = str(value).lstrip()
|
||||
if not container_state:
|
||||
container_state = container_db_dict['state']
|
||||
else:
|
||||
assert container_db_dict['state'] == container_state, "all containers are not in same state"
|
||||
|
||||
return container_state
|
||||
|
||||
@classmethod
|
||||
def find_all_datanodes_container_status(cls, docker_compose_file, scale):
|
||||
"""
|
||||
This function returns container replica states of all datanodes.
|
||||
"""
|
||||
all_datanode_container_status = []
|
||||
for index in range(scale):
|
||||
all_datanode_container_status.append(cls.find_datanode_container_status(docker_compose_file, index+1))
|
||||
logger.info("All datanodes container status: %s", ' '.join(all_datanode_container_status))
|
||||
|
||||
return all_datanode_container_status
|
|
@ -0,0 +1,91 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""This module has apis to create and remove a blockade cluster"""
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
from blockadeUtils.blockade import Blockade
|
||||
from clusterUtils.cluster_utils import ClusterUtils
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
|
||||
FILE = os.path.join(parent_dir, "compose", "ozone", "docker-compose.yaml")
|
||||
SCALE = 3
|
||||
CONTAINER_LIST = []
|
||||
OM = []
|
||||
SCM = []
|
||||
DATANODES = []
|
||||
|
||||
|
||||
def setup():
|
||||
global CONTAINER_LIST, OM, SCM, DATANODES
|
||||
Blockade.blockade_destroy()
|
||||
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
|
||||
exit_code, output = Blockade.blockade_status()
|
||||
assert exit_code == 0, "blockade status command failed with output=[%s]" % output
|
||||
OM = filter(lambda x: 'ozoneManager' in x, CONTAINER_LIST)
|
||||
SCM = filter(lambda x: 'scm' in x, CONTAINER_LIST)
|
||||
DATANODES = sorted(list(filter(lambda x: 'datanode' in x, CONTAINER_LIST)))
|
||||
|
||||
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
assert exit_code == 0, "freon run failed with output=[%s]" % output
|
||||
|
||||
|
||||
def teardown():
|
||||
logger.info("Inside teardown")
|
||||
Blockade.blockade_destroy()
|
||||
|
||||
|
||||
def teardown_module():
|
||||
ClusterUtils.cluster_destroy(FILE)
|
||||
|
||||
|
||||
def test_datanode_isolation_one_node():
|
||||
"""
|
||||
In this test, one of the datanodes cannot communicate with other two datanodes.
|
||||
All datanodes can communicate with SCM.
|
||||
"""
|
||||
first_set = [OM[0], SCM[0], DATANODES[0]]
|
||||
second_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]]
|
||||
Blockade.blockade_create_partition(first_set, second_set)
|
||||
Blockade.blockade_status()
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
logger.info("Waiting for 480 seconds before checking container status")
|
||||
time.sleep(480)
|
||||
all_datanodes_container_status = ClusterUtils.find_all_datanodes_container_status(FILE, SCALE)
|
||||
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED', all_datanodes_container_status)
|
||||
assert len(count_closed_container_datanodes) == 3, "The container should have three closed replicas."
|
||||
|
||||
|
||||
def test_datanode_isolation_all():
|
||||
"""
|
||||
In this test, none of the datanodes can communicate with other two datanodes.
|
||||
All datanodes can communicate with SCM.
|
||||
"""
|
||||
first_set = [OM[0], SCM[0], DATANODES[0]]
|
||||
second_set = [OM[0], SCM[0], DATANODES[1]]
|
||||
third_set = [OM[0], SCM[0], DATANODES[2]]
|
||||
Blockade.blockade_create_partition(first_set, second_set, third_set)
|
||||
Blockade.blockade_status()
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
logger.info("Waiting for 480 seconds before checking container status")
|
||||
time.sleep(480)
|
||||
all_datanodes_container_status = ClusterUtils.find_all_datanodes_container_status(FILE, SCALE)
|
||||
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED', all_datanodes_container_status)
|
||||
assert len(count_closed_container_datanodes) >= 2, "The container should have at least two closed replicas."
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
"""This module has apis to create and remove a blockade cluster"""
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
import pytest
|
||||
from blockadeUtils.blockade import Blockade
|
||||
|
@ -32,9 +33,10 @@ CONTAINER_LIST = []
|
|||
|
||||
def setup_module():
|
||||
global CONTAINER_LIST
|
||||
Blockade.blockade_destroy()
|
||||
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
|
||||
output = Blockade.blockade_status()
|
||||
assert output == 0, "blockade status command failed with exit code=[%s]" % output
|
||||
exit_code, output = Blockade.blockade_status()
|
||||
assert exit_code == 0, "blockade status command failed with output=[%s]" % output
|
||||
|
||||
|
||||
def teardown_module():
|
||||
|
@ -45,10 +47,12 @@ def teardown_module():
|
|||
def teardown():
|
||||
logger.info("Inside teardown")
|
||||
Blockade.blockade_fast_all()
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("flaky_nodes", ["datanode", "scm", "om", "all"])
|
||||
def test_flaky(flaky_nodes):
|
||||
Blockade.make_flaky(flaky_nodes, CONTAINER_LIST)
|
||||
Blockade.blockade_status()
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
assert exit_code == 0, "freon run failed with output=[%s]" % output
|
Loading…
Reference in New Issue