HDDS-1088. Add blockade Tests to test Replica Manager. Contributed by Nilotpal Nandi.
This commit is contained in:
parent
155ab6d5d8
commit
a7f5e742a6
|
@ -46,6 +46,9 @@ RUN mkdir -p /opt && \
|
|||
#Install docker-compose
|
||||
RUN pip install docker-compose
|
||||
|
||||
#Install pytest==2.8.7
|
||||
RUN pip install pytest==2.8.7
|
||||
|
||||
ENV PATH=$PATH:/opt/findbugs/bin
|
||||
|
||||
RUN addgroup -g 1000 default && \
|
||||
|
|
|
@ -41,4 +41,14 @@ cd $DIRECTORY_OF_OZONE
|
|||
python -m pytest -s blockade/ --containerStatusSleep=<SECONDS>
|
||||
|
||||
e.g: python -m pytest -s blockade/ --containerStatusSleep=720
|
||||
```
|
||||
|
||||
By default, second phase of the tests will not be run.
|
||||
In order to run the second phase of the tests, you can run following
|
||||
command-lines:
|
||||
|
||||
```
|
||||
cd $DIRECTORY_OF_OZONE
|
||||
python -m pytest -s blockade/ --runSecondPhase=true
|
||||
|
||||
```
|
|
@ -21,280 +21,289 @@ import subprocess
|
|||
import logging
|
||||
import time
|
||||
import re
|
||||
import yaml
|
||||
import os
|
||||
import yaml
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ClusterUtils(object):
|
||||
"""
|
||||
This class contains all cluster related operations.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def cluster_setup(cls, docker_compose_file, datanode_count):
|
||||
"""start a blockade cluster"""
|
||||
logger.info("compose file :%s", docker_compose_file)
|
||||
logger.info("number of DNs :%d", datanode_count)
|
||||
call(["docker-compose", "-f", docker_compose_file, "down"])
|
||||
call(["docker-compose", "-f", docker_compose_file, "up", "-d",
|
||||
"--scale", "datanode=" + str(datanode_count)])
|
||||
@classmethod
|
||||
def cluster_setup(cls, docker_compose_file, datanode_count,
|
||||
destroy_existing_cluster=True):
|
||||
"""start a blockade cluster"""
|
||||
logger.info("compose file :%s", docker_compose_file)
|
||||
logger.info("number of DNs :%d", datanode_count)
|
||||
if destroy_existing_cluster:
|
||||
call(["docker-compose", "-f", docker_compose_file, "down"])
|
||||
call(["docker-compose", "-f", docker_compose_file, "up", "-d",
|
||||
"--scale", "datanode=" + str(datanode_count)])
|
||||
|
||||
logger.info("Waiting 30s for cluster start up...")
|
||||
time.sleep(30)
|
||||
output = subprocess.check_output(["docker-compose", "-f",
|
||||
docker_compose_file, "ps"])
|
||||
output_array = output.split("\n")[2:-1]
|
||||
logger.info("Waiting 30s for cluster start up...")
|
||||
time.sleep(30)
|
||||
output = subprocess.check_output(["docker-compose", "-f",
|
||||
docker_compose_file, "ps"])
|
||||
output_array = output.split("\n")[2:-1]
|
||||
|
||||
container_list = []
|
||||
for out in output_array:
|
||||
container = out.split(" ")[0]
|
||||
container_list.append(container)
|
||||
call(["blockade", "add", container])
|
||||
time.sleep(2)
|
||||
container_list = []
|
||||
for out in output_array:
|
||||
container = out.split(" ")[0]
|
||||
container_list.append(container)
|
||||
call(["blockade", "add", container])
|
||||
time.sleep(2)
|
||||
|
||||
assert container_list, "no container found!"
|
||||
logger.info("blockade created with containers %s",
|
||||
' '.join(container_list))
|
||||
assert container_list, "no container found!"
|
||||
logger.info("blockade created with containers %s",
|
||||
' '.join(container_list))
|
||||
|
||||
return container_list
|
||||
return container_list
|
||||
|
||||
@classmethod
|
||||
def cluster_destroy(cls, docker_compose_file):
|
||||
logger.info("Running docker-compose -f %s down", docker_compose_file)
|
||||
call(["docker-compose", "-f", docker_compose_file, "down"])
|
||||
@classmethod
|
||||
def cluster_destroy(cls, docker_compose_file):
|
||||
logger.info("Running docker-compose -f %s down", docker_compose_file)
|
||||
call(["docker-compose", "-f", docker_compose_file, "down"])
|
||||
|
||||
@classmethod
|
||||
def run_freon(cls, docker_compose_file, num_volumes, num_buckets,
|
||||
num_keys, key_size, replication_type, replication_factor,
|
||||
freon_client='ozoneManager'):
|
||||
# run freon
|
||||
cmd = "docker-compose -f %s " \
|
||||
"exec %s /opt/hadoop/bin/ozone " \
|
||||
"freon rk " \
|
||||
"--numOfVolumes %s " \
|
||||
"--numOfBuckets %s " \
|
||||
"--numOfKeys %s " \
|
||||
"--keySize %s " \
|
||||
"--replicationType %s " \
|
||||
"--factor %s" % (docker_compose_file, freon_client, num_volumes,
|
||||
num_buckets, num_keys, key_size,
|
||||
replication_type, replication_factor)
|
||||
@classmethod
|
||||
def run_freon(cls, docker_compose_file, num_volumes, num_buckets,
|
||||
num_keys, key_size, replication_type, replication_factor,
|
||||
freon_client='ozoneManager'):
|
||||
# run freon
|
||||
cmd = "docker-compose -f %s " \
|
||||
"exec %s /opt/hadoop/bin/ozone " \
|
||||
"freon rk " \
|
||||
"--numOfVolumes %s " \
|
||||
"--numOfBuckets %s " \
|
||||
"--numOfKeys %s " \
|
||||
"--keySize %s " \
|
||||
"--replicationType %s " \
|
||||
"--factor %s" % (docker_compose_file, freon_client, num_volumes,
|
||||
num_buckets, num_keys, key_size,
|
||||
replication_type, replication_factor)
|
||||
exit_code, output = cls.run_cmd(cmd)
|
||||
return exit_code, output
|
||||
|
||||
@classmethod
|
||||
def run_cmd(cls, cmd):
|
||||
command = cmd
|
||||
if isinstance(cmd, list):
|
||||
command = ' '.join(cmd)
|
||||
logger.info(" RUNNING: %s", command)
|
||||
all_output = ""
|
||||
myprocess = subprocess.Popen(cmd, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT, shell=True)
|
||||
while myprocess.poll() is None:
|
||||
op = myprocess.stdout.readline()
|
||||
if op:
|
||||
all_output += op
|
||||
logger.info(op)
|
||||
other_output = myprocess.communicate()
|
||||
other_output = other_output[0].strip()
|
||||
if other_output != "":
|
||||
all_output += other_output
|
||||
for each_line in other_output.split("\n"):
|
||||
logger.info(" %s", each_line.strip())
|
||||
reg = re.compile(r"(\r\n|\n)$")
|
||||
all_output = reg.sub("", all_output, 1)
|
||||
|
||||
return myprocess.returncode, all_output
|
||||
|
||||
@classmethod
|
||||
def get_ozone_confkey_value(cls, docker_compose_file, key_name):
|
||||
cmd = "docker-compose -f %s " \
|
||||
"exec ozoneManager /opt/hadoop/bin/ozone " \
|
||||
"getconf -confKey %s" \
|
||||
% (docker_compose_file, key_name)
|
||||
exit_code, output = cls.run_cmd(cmd)
|
||||
assert exit_code == 0, "getconf of key=[%s] failed with output=[%s]" \
|
||||
% (key_name, output)
|
||||
return str(output).strip()
|
||||
|
||||
@classmethod
|
||||
def find_scm_uuid(cls, docker_compose_file):
|
||||
"""
|
||||
This function returns scm uuid.
|
||||
"""
|
||||
ozone_metadata_dir = cls.get_ozone_confkey_value(docker_compose_file,
|
||||
"ozone.metadata.dirs")
|
||||
cmd = "docker-compose -f %s exec scm cat %s/scm/current/VERSION" % \
|
||||
(docker_compose_file, ozone_metadata_dir)
|
||||
exit_code, output = cls.run_cmd(cmd)
|
||||
assert exit_code == 0, "get scm UUID failed with output=[%s]" % output
|
||||
output_list = output.split("\n")
|
||||
output_list = [x for x in output_list if re.search(r"\w+=\w+", x)]
|
||||
output_dict = dict(x.split("=") for x in output_list)
|
||||
return str(output_dict['scmUuid']).strip()
|
||||
|
||||
@classmethod
|
||||
def find_container_status(cls, docker_compose_file, datanode_index):
|
||||
"""
|
||||
This function returns the datanode's container replica state.
|
||||
In this function, it finds all <containerID>.container files.
|
||||
Then, it opens each file and checks the state of the containers
|
||||
in the datanode.
|
||||
It returns 'None' as container state if it cannot find any
|
||||
<containerID>.container file in the datanode.
|
||||
Sample <containerID>.container contains state as following:
|
||||
state: <STATE OF THE CONTAINER REPLICA>
|
||||
"""
|
||||
|
||||
datanode_dir = cls.get_ozone_confkey_value(docker_compose_file,
|
||||
"hdds.datanode.dir")
|
||||
scm_uuid = cls.find_scm_uuid(docker_compose_file)
|
||||
container_parent_path = "%s/hdds/%s/current/containerDir0" % \
|
||||
(datanode_dir, scm_uuid)
|
||||
cmd = "docker-compose -f %s exec --index=%s datanode find %s -type f " \
|
||||
"-name '*.container'" \
|
||||
% (docker_compose_file, datanode_index, container_parent_path)
|
||||
exit_code, output = cls.run_cmd(cmd)
|
||||
container_state = "None"
|
||||
if exit_code == 0 and output:
|
||||
container_list = map(str.strip, output.split("\n"))
|
||||
for container_path in container_list:
|
||||
cmd = "docker-compose -f %s exec --index=%s datanode cat %s" \
|
||||
% (docker_compose_file, datanode_index, container_path)
|
||||
exit_code, output = cls.run_cmd(cmd)
|
||||
return exit_code, output
|
||||
assert exit_code == 0, \
|
||||
"command=[%s] failed with output=[%s]" % (cmd, output)
|
||||
container_db_list = output.split("\n")
|
||||
container_db_list = [x for x in container_db_list
|
||||
if re.search(r"\w+:\s\w+", x)]
|
||||
# container_db_list will now contain the lines which has got
|
||||
# yaml representation , i.e , key: value
|
||||
container_db_info = "\n".join(container_db_list)
|
||||
container_db_dict = yaml.load(container_db_info)
|
||||
for key, value in container_db_dict.items():
|
||||
container_db_dict[key] = str(value).lstrip()
|
||||
if container_state == "None":
|
||||
container_state = container_db_dict['state']
|
||||
else:
|
||||
assert container_db_dict['state'] == container_state, \
|
||||
"all containers are not in same state"
|
||||
|
||||
@classmethod
|
||||
def run_cmd(cls, cmd):
|
||||
command = cmd
|
||||
if isinstance(cmd, list):
|
||||
command = ' '.join(cmd)
|
||||
logger.info(" RUNNING: " + command)
|
||||
all_output = ""
|
||||
myprocess = subprocess.Popen(cmd, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT, shell=True)
|
||||
while myprocess.poll() is None:
|
||||
op = myprocess.stdout.readline()
|
||||
if op:
|
||||
all_output += op
|
||||
logger.info(op)
|
||||
other_output = myprocess.communicate()
|
||||
other_output = other_output[0].strip()
|
||||
if other_output != "":
|
||||
all_output += other_output
|
||||
for each_line in other_output.split("\n"):
|
||||
logger.info(" " + each_line.strip())
|
||||
reg = re.compile(r"(\r\n|\n)$")
|
||||
all_output = reg.sub("", all_output, 1)
|
||||
return container_state
|
||||
|
||||
return myprocess.returncode, all_output
|
||||
@classmethod
|
||||
def findall_container_status(cls, docker_compose_file, scale):
|
||||
"""
|
||||
This function returns container replica states of all datanodes.
|
||||
"""
|
||||
all_datanode_container_status = []
|
||||
for index in range(scale):
|
||||
all_datanode_container_status.append(
|
||||
cls.find_container_status(docker_compose_file, index + 1))
|
||||
logger.info("All datanodes container status: %s",
|
||||
' '.join(all_datanode_container_status))
|
||||
|
||||
@classmethod
|
||||
def get_ozone_confkey_value(cls, docker_compose_file, key_name):
|
||||
cmd = "docker-compose -f %s " \
|
||||
"exec ozoneManager /opt/hadoop/bin/ozone " \
|
||||
"getconf -confKey %s" \
|
||||
% (docker_compose_file, key_name)
|
||||
exit_code, output = cls.run_cmd(cmd)
|
||||
assert exit_code == 0, "getconf of key=[%s] failed with output=[%s]" \
|
||||
%(key_name, output)
|
||||
return str(output).strip()
|
||||
return all_datanode_container_status
|
||||
|
||||
@classmethod
|
||||
def find_scm_uuid(cls, docker_compose_file):
|
||||
"""
|
||||
This function returns scm uuid.
|
||||
"""
|
||||
ozone_metadata_dir = cls.get_ozone_confkey_value(docker_compose_file,
|
||||
"ozone.metadata.dirs")
|
||||
cmd = "docker-compose -f %s exec scm cat %s/scm/current/VERSION" % \
|
||||
(docker_compose_file, ozone_metadata_dir)
|
||||
exit_code, output = cls.run_cmd(cmd)
|
||||
assert exit_code == 0, "get scm UUID failed with output=[%s]" % output
|
||||
output_list = output.split("\n")
|
||||
output_list = list(filter(lambda x: re.search("\w+=\w+", x),
|
||||
output_list))
|
||||
output_dict = dict(map(lambda x: x.split("="), output_list))
|
||||
return str(output_dict['scmUuid']).strip()
|
||||
|
||||
@classmethod
|
||||
def find_datanode_container_status(cls, docker_compose_file,
|
||||
datanode_index):
|
||||
"""
|
||||
This function returns the datanode's container replica state.
|
||||
"""
|
||||
datanode_dir = cls.get_ozone_confkey_value(docker_compose_file,
|
||||
"hdds.datanode.dir")
|
||||
scm_uuid = cls.find_scm_uuid(docker_compose_file)
|
||||
container_parent_path = "%s/hdds/%s/current/containerDir0" % \
|
||||
(datanode_dir, scm_uuid)
|
||||
cmd = "docker-compose -f %s exec --index=%s datanode find %s -type f " \
|
||||
"-name '*.container'" \
|
||||
% (docker_compose_file, datanode_index, container_parent_path)
|
||||
exit_code, output = cls.run_cmd(cmd)
|
||||
assert exit_code == 0, "command=[%s] failed with output=[%s]" % \
|
||||
(cmd, output)
|
||||
assert output, "No container info present"
|
||||
container_list = map(str.strip, output.split("\n"))
|
||||
container_state = None
|
||||
for container_path in container_list:
|
||||
cmd = "docker-compose -f %s exec --index=%s datanode cat %s" \
|
||||
% (docker_compose_file, datanode_index, container_path)
|
||||
exit_code, output = cls.run_cmd(cmd)
|
||||
assert exit_code == 0, "command=[%s] failed with output=[%s]" % \
|
||||
(cmd, output)
|
||||
container_db_list = output.split("\n")
|
||||
container_db_list = \
|
||||
list(filter(lambda x: re.search("\w+:\s\w+", x),
|
||||
container_db_list))
|
||||
container_db_info = "\n".join(container_db_list)
|
||||
container_db_dict = yaml.load(container_db_info)
|
||||
for key, value in container_db_dict.items():
|
||||
container_db_dict[key] = str(value).lstrip()
|
||||
if not container_state:
|
||||
container_state = container_db_dict['state']
|
||||
else:
|
||||
assert container_db_dict['state'] == container_state, \
|
||||
"all containers are not in same state"
|
||||
|
||||
return container_state
|
||||
|
||||
@classmethod
|
||||
def find_all_datanodes_container_status(cls, docker_compose_file, scale):
|
||||
"""
|
||||
This function returns container replica states of all datanodes.
|
||||
"""
|
||||
all_datanode_container_status = []
|
||||
for index in range(scale):
|
||||
all_datanode_container_status.append(
|
||||
cls.find_datanode_container_status(docker_compose_file,
|
||||
index+1))
|
||||
logger.info("All datanodes container status: %s",
|
||||
' '.join(all_datanode_container_status))
|
||||
|
||||
return all_datanode_container_status
|
||||
|
||||
@classmethod
|
||||
def create_volume(cls, docker_compose_file, volume_name):
|
||||
command = "docker-compose -f %s " \
|
||||
@classmethod
|
||||
def create_volume(cls, docker_compose_file, volume_name):
|
||||
command = "docker-compose -f %s " \
|
||||
"exec ozone_client /opt/hadoop/bin/ozone " \
|
||||
"sh volume create /%s --user root" % \
|
||||
(docker_compose_file, volume_name)
|
||||
logger.info("Creating Volume %s", volume_name)
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
assert exit_code == 0, "Ozone volume create failed with output=[%s]" \
|
||||
% output
|
||||
"sh volume create /%s --user root" % \
|
||||
(docker_compose_file, volume_name)
|
||||
logger.info("Creating Volume %s", volume_name)
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
assert exit_code == 0, "Ozone volume create failed with output=[%s]" \
|
||||
% output
|
||||
|
||||
@classmethod
|
||||
def delete_volume(cls, docker_compose_file, volume_name):
|
||||
command = "docker-compose -f %s " \
|
||||
@classmethod
|
||||
def delete_volume(cls, docker_compose_file, volume_name):
|
||||
command = "docker-compose -f %s " \
|
||||
"exec ozone_client /opt/hadoop/bin/ozone " \
|
||||
"sh volume delete /%s" % (docker_compose_file, volume_name)
|
||||
logger.info("Deleting Volume %s", volume_name)
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
return exit_code, output
|
||||
"sh volume delete /%s" % (docker_compose_file, volume_name)
|
||||
logger.info("Deleting Volume %s", volume_name)
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
return exit_code, output
|
||||
|
||||
@classmethod
|
||||
def create_bucket(cls, docker_compose_file, bucket_name, volume_name):
|
||||
command = "docker-compose -f %s " \
|
||||
@classmethod
|
||||
def create_bucket(cls, docker_compose_file, bucket_name, volume_name):
|
||||
command = "docker-compose -f %s " \
|
||||
"exec ozone_client /opt/hadoop/bin/ozone " \
|
||||
"sh bucket create /%s/%s" % (docker_compose_file,
|
||||
volume_name, bucket_name)
|
||||
logger.info("Creating Bucket %s in volume %s",
|
||||
bucket_name, volume_name)
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
assert exit_code == 0, "Ozone bucket create failed with output=[%s]" \
|
||||
% output
|
||||
logger.info("Creating Bucket %s in volume %s",
|
||||
bucket_name, volume_name)
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
assert exit_code == 0, "Ozone bucket create failed with output=[%s]" \
|
||||
% output
|
||||
|
||||
@classmethod
|
||||
def delete_bucket(cls, docker_compose_file, bucket_name, volume_name):
|
||||
command = "docker-compose -f %s " \
|
||||
@classmethod
|
||||
def delete_bucket(cls, docker_compose_file, bucket_name, volume_name):
|
||||
command = "docker-compose -f %s " \
|
||||
"exec ozone_client /opt/hadoop/bin/ozone " \
|
||||
"sh bucket delete /%s/%s" % (docker_compose_file,
|
||||
volume_name, bucket_name)
|
||||
logger.info("Running delete bucket of %s/%s", volume_name, bucket_name)
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
return exit_code, output
|
||||
"sh bucket delete /%s/%s" % (docker_compose_file,
|
||||
volume_name, bucket_name)
|
||||
logger.info("Running delete bucket of %s/%s", volume_name, bucket_name)
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
return exit_code, output
|
||||
|
||||
@classmethod
|
||||
def put_key(cls, docker_compose_file, bucket_name, volume_name,
|
||||
filepath, key_name=None, replication_factor=None):
|
||||
command = "docker-compose -f %s " \
|
||||
@classmethod
|
||||
def put_key(cls, docker_compose_file, bucket_name, volume_name,
|
||||
filepath, key_name=None, replication_factor=None):
|
||||
command = "docker-compose -f %s " \
|
||||
"exec ozone_client ls %s" % (docker_compose_file, filepath)
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
assert exit_code == 0, "%s does not exist" % filepath
|
||||
if key_name is None:
|
||||
key_name = os.path.basename(filepath)
|
||||
command = "docker-compose -f %s " \
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
assert exit_code == 0, "%s does not exist" % filepath
|
||||
if key_name is None:
|
||||
key_name = os.path.basename(filepath)
|
||||
command = "docker-compose -f %s " \
|
||||
"exec ozone_client /opt/hadoop/bin/ozone " \
|
||||
"sh key put /%s/%s/%s %s" % (docker_compose_file,
|
||||
volume_name, bucket_name,
|
||||
key_name, filepath)
|
||||
if replication_factor:
|
||||
command = "%s --replication=%s" % (command, replication_factor)
|
||||
logger.info("Creating key %s in %s/%s", key_name,
|
||||
volume_name, bucket_name)
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
assert exit_code == 0, "Ozone put Key failed with output=[%s]" % output
|
||||
"sh key put /%s/%s/%s %s" % (docker_compose_file,
|
||||
volume_name, bucket_name,
|
||||
key_name, filepath)
|
||||
if replication_factor:
|
||||
command = "%s --replication=%s" % (command, replication_factor)
|
||||
logger.info("Creating key %s in %s/%s", key_name,
|
||||
volume_name, bucket_name)
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
assert exit_code == 0, "Ozone put Key failed with output=[%s]" % output
|
||||
|
||||
@classmethod
|
||||
def delete_key(cls, docker_compose_file, bucket_name, volume_name,
|
||||
key_name):
|
||||
command = "docker-compose -f %s " \
|
||||
@classmethod
|
||||
def delete_key(cls, docker_compose_file, bucket_name, volume_name,
|
||||
key_name):
|
||||
command = "docker-compose -f %s " \
|
||||
"exec ozone_client /opt/hadoop/bin/ozone " \
|
||||
"sh key delete /%s/%s/%s" \
|
||||
% (docker_compose_file, volume_name, bucket_name, key_name)
|
||||
logger.info("Running delete key %s in %s/%s",
|
||||
key_name, volume_name, bucket_name)
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
return exit_code, output
|
||||
"sh key delete /%s/%s/%s" \
|
||||
% (docker_compose_file, volume_name, bucket_name, key_name)
|
||||
logger.info("Running delete key %s in %s/%s",
|
||||
key_name, volume_name, bucket_name)
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
return exit_code, output
|
||||
|
||||
@classmethod
|
||||
def get_key(cls, docker_compose_file, bucket_name, volume_name,
|
||||
key_name, filepath=None):
|
||||
if filepath is None:
|
||||
filepath = '.'
|
||||
command = "docker-compose -f %s " \
|
||||
@classmethod
|
||||
def get_key(cls, docker_compose_file, bucket_name, volume_name,
|
||||
key_name, filepath=None):
|
||||
if filepath is None:
|
||||
filepath = '.'
|
||||
command = "docker-compose -f %s " \
|
||||
"exec ozone_client /opt/hadoop/bin/ozone " \
|
||||
"sh key get /%s/%s/%s %s" % (docker_compose_file,
|
||||
volume_name, bucket_name,
|
||||
key_name, filepath)
|
||||
logger.info("Running get key %s in %s/%s", key_name,
|
||||
volume_name, bucket_name)
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
assert exit_code == 0, "Ozone get Key failed with output=[%s]" % output
|
||||
"sh key get /%s/%s/%s %s" % (docker_compose_file,
|
||||
volume_name, bucket_name,
|
||||
key_name, filepath)
|
||||
logger.info("Running get key %s in %s/%s", key_name,
|
||||
volume_name, bucket_name)
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
assert exit_code == 0, "Ozone get Key failed with output=[%s]" % output
|
||||
|
||||
@classmethod
|
||||
def find_checksum(cls, docker_compose_file, filepath):
|
||||
command = "docker-compose -f %s " \
|
||||
@classmethod
|
||||
def find_checksum(cls, docker_compose_file, filepath):
|
||||
command = "docker-compose -f %s " \
|
||||
"exec ozone_client md5sum %s" % (docker_compose_file, filepath)
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
assert exit_code == 0, "Cant find checksum"
|
||||
myoutput = output.split("\n")
|
||||
finaloutput = ""
|
||||
for line in myoutput:
|
||||
if line.find("Warning") >= 0 or line.find("is not a tty") >= 0:
|
||||
logger.info("skip this line: %s", line)
|
||||
else:
|
||||
finaloutput = finaloutput + line
|
||||
checksum = finaloutput.split(" ")
|
||||
logger.info("Checksum of %s is : %s", filepath, checksum[0])
|
||||
return checksum[0]
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
assert exit_code == 0, "Cant find checksum"
|
||||
myoutput = output.split("\n")
|
||||
finaloutput = ""
|
||||
for line in myoutput:
|
||||
if line.find("Warning") >= 0 or line.find("is not a tty") >= 0:
|
||||
logger.info("skip this line: %s", line)
|
||||
else:
|
||||
finaloutput = finaloutput + line
|
||||
checksum = finaloutput.split(" ")
|
||||
logger.info("Checksum of %s is : %s", filepath, checksum[0])
|
||||
return checksum[0]
|
||||
|
|
|
@ -17,80 +17,97 @@ import logging
|
|||
import os
|
||||
import time
|
||||
import subprocess
|
||||
import pytest
|
||||
|
||||
EPOCH_TIME = int(time.time())
|
||||
|
||||
|
||||
def pytest_addoption(parser):
|
||||
parser.addoption("--output-dir",
|
||||
action="store",
|
||||
default="/tmp/BlockadeTests",
|
||||
help="location of output directory where output log "
|
||||
"and plot files will be created")
|
||||
parser.addoption("--log-format",
|
||||
action="store",
|
||||
default="%(asctime)s|%(levelname)s|%(threadName)s|"
|
||||
"%(filename)s:%(lineno)s -"
|
||||
" %(funcName)s()|%(message)s",
|
||||
help="specify log format")
|
||||
parser.addoption("--log-level",
|
||||
action="store",
|
||||
default="info",
|
||||
help="specify log level")
|
||||
parser.addoption("--containerStatusSleep",
|
||||
action="store",
|
||||
default="900",
|
||||
help="sleep time before checking container status")
|
||||
parser.addoption("--output-dir",
|
||||
action="store",
|
||||
default="/tmp/BlockadeTests",
|
||||
help="location of output directory where output log "
|
||||
"and plot files will be created")
|
||||
parser.addoption("--log-format",
|
||||
action="store",
|
||||
default="%(asctime)s|%(levelname)s|%(threadName)s|"
|
||||
"%(filename)s:%(lineno)s -"
|
||||
" %(funcName)s()|%(message)s",
|
||||
help="specify log format")
|
||||
parser.addoption("--log-level",
|
||||
action="store",
|
||||
default="info",
|
||||
help="specify log level")
|
||||
parser.addoption("--containerStatusSleep",
|
||||
action="store",
|
||||
default="900",
|
||||
help="sleep time before checking container status")
|
||||
parser.addoption("--runSecondPhase",
|
||||
action="store",
|
||||
default="false",
|
||||
help="run second phase of the tests")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def run_second_phase(request):
|
||||
"""
|
||||
:param request:
|
||||
This function returns if the user has opted for running second phase
|
||||
of the tests.
|
||||
"""
|
||||
return request.config.getoption("--runSecondPhase")
|
||||
|
||||
|
||||
def pytest_configure(config):
|
||||
global OUTPUT_DIR
|
||||
os.environ["CONTAINER_STATUS_SLEEP"] = config.option.containerStatusSleep
|
||||
OUTPUT_DIR = "%s/%s" % (config.option.output_dir, EPOCH_TIME)
|
||||
try:
|
||||
os.makedirs(OUTPUT_DIR)
|
||||
except OSError, e:
|
||||
raise Exception(e.strerror + ": " + e.filename)
|
||||
log_file = os.path.join(OUTPUT_DIR, "output.log")
|
||||
global OUTPUT_DIR
|
||||
os.environ["CONTAINER_STATUS_SLEEP"] = config.option.containerStatusSleep
|
||||
OUTPUT_DIR = "%s/%s" % (config.option.output_dir, EPOCH_TIME)
|
||||
try:
|
||||
os.makedirs(OUTPUT_DIR)
|
||||
except OSError, e:
|
||||
raise Exception(e.strerror + ": " + e.filename)
|
||||
log_file = os.path.join(OUTPUT_DIR, "output.log")
|
||||
|
||||
if config.option.log_level == "trace":
|
||||
loglevel = eval("logging.DEBUG")
|
||||
else:
|
||||
loglevel = eval("logging." + config.option.log_level.upper())
|
||||
logformatter = logging.Formatter(config.option.log_format)
|
||||
logging.basicConfig(filename=log_file,
|
||||
filemode='w',
|
||||
level=loglevel,
|
||||
format=config.option.log_format)
|
||||
console = logging.StreamHandler()
|
||||
console.setLevel(loglevel)
|
||||
console.setFormatter(logformatter)
|
||||
logging.getLogger('').addHandler(console)
|
||||
if config.option.log_level == "trace":
|
||||
loglevel = eval("logging.DEBUG")
|
||||
else:
|
||||
loglevel = eval("logging." + config.option.log_level.upper())
|
||||
logformatter = logging.Formatter(config.option.log_format)
|
||||
logging.basicConfig(filename=log_file,
|
||||
filemode='w',
|
||||
level=loglevel,
|
||||
format=config.option.log_format)
|
||||
console = logging.StreamHandler()
|
||||
console.setLevel(loglevel)
|
||||
console.setFormatter(logformatter)
|
||||
logging.getLogger('').addHandler(console)
|
||||
|
||||
|
||||
def pytest_report_teststatus(report):
|
||||
logger = logging.getLogger('main')
|
||||
loc, line, name = report.location
|
||||
if report.outcome == 'skipped':
|
||||
pass
|
||||
elif report.when == 'setup':
|
||||
logger.info("RUNNING TEST \"%s\" at location \"%s\" at line number"
|
||||
" \"%s\"" % (name, loc, str(line)))
|
||||
elif report.when == 'call':
|
||||
logger.info("TEST \"%s\" %s in %3.2f seconds" %
|
||||
(name, report.outcome.upper(), report.duration))
|
||||
log_file_path = "%s/%s_all_docker.log" % \
|
||||
(OUTPUT_DIR, name)
|
||||
gather_docker_logs(log_file_path)
|
||||
logger = logging.getLogger('main')
|
||||
loc, line, name = report.location
|
||||
if report.outcome == 'skipped':
|
||||
pass
|
||||
elif report.when == 'setup':
|
||||
logger.info("RUNNING TEST \"%s\" at location \"%s\" at line number"
|
||||
" \"%s\"" % (name, loc, str(line)))
|
||||
elif report.when == 'call':
|
||||
logger.info("TEST \"%s\" %s in %3.2f seconds" %
|
||||
(name, report.outcome.upper(), report.duration))
|
||||
log_file_path = "%s/%s_all_docker.log" % \
|
||||
(OUTPUT_DIR, name)
|
||||
gather_docker_logs(log_file_path)
|
||||
|
||||
|
||||
def pytest_sessionfinish(session):
|
||||
logger = logging.getLogger('main')
|
||||
logger.info("ALL TESTS FINISHED")
|
||||
logger.info("ALL logs present in following directory: %s", OUTPUT_DIR)
|
||||
logger = logging.getLogger('main')
|
||||
logger.info("ALL TESTS FINISHED")
|
||||
logger.info("ALL logs present in following directory: %s", OUTPUT_DIR)
|
||||
|
||||
|
||||
def gather_docker_logs(log_file_path):
|
||||
docker_compose_file = os.environ["DOCKER_COMPOSE_FILE"]
|
||||
output = subprocess.check_output(["docker-compose", "-f",
|
||||
docker_compose_file, "logs"])
|
||||
with open(log_file_path, "w") as text_file:
|
||||
text_file.write(output)
|
||||
docker_compose_file = os.environ["DOCKER_COMPOSE_FILE"]
|
||||
output = subprocess.check_output(["docker-compose", "-f",
|
||||
docker_compose_file, "logs"])
|
||||
with open(log_file_path, "w") as text_file:
|
||||
text_file.write(output)
|
||||
|
|
|
@ -17,17 +17,18 @@
|
|||
|
||||
import os
|
||||
import time
|
||||
import re
|
||||
import logging
|
||||
from blockadeUtils.blockade import Blockade
|
||||
from clusterUtils.cluster_utils import ClusterUtils
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
|
||||
FILE = os.path.join(parent_dir, "compose", "ozoneblockade",
|
||||
"docker-compose.yaml")
|
||||
os.environ["DOCKER_COMPOSE_FILE"] = FILE
|
||||
SCALE = 3
|
||||
INCREASED_SCALE = 5
|
||||
CONTAINER_LIST = []
|
||||
OM = []
|
||||
SCM = []
|
||||
|
@ -35,77 +36,113 @@ DATANODES = []
|
|||
|
||||
|
||||
def setup():
|
||||
global CONTAINER_LIST, OM, SCM, DATANODES
|
||||
Blockade.blockade_destroy()
|
||||
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
|
||||
exit_code, output = Blockade.blockade_status()
|
||||
assert exit_code == 0, "blockade status command failed with output=[%s]" % \
|
||||
output
|
||||
OM = filter(lambda x: 'ozoneManager' in x, CONTAINER_LIST)
|
||||
SCM = filter(lambda x: 'scm' in x, CONTAINER_LIST)
|
||||
DATANODES = sorted(list(filter(lambda x: 'datanode' in x, CONTAINER_LIST)))
|
||||
global CONTAINER_LIST, OM, SCM, DATANODES
|
||||
Blockade.blockade_destroy()
|
||||
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
|
||||
exit_code, output = Blockade.blockade_status()
|
||||
assert exit_code == 0, "blockade status command failed with output=[%s]" % \
|
||||
output
|
||||
OM = [x for x in CONTAINER_LIST if 'ozoneManager' in x]
|
||||
SCM = [x for x in CONTAINER_LIST if 'scm' in x]
|
||||
DATANODES = sorted(x for x in CONTAINER_LIST if 'datanode' in x)
|
||||
|
||||
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
|
||||
"THREE")
|
||||
assert exit_code == 0, "freon run failed with output=[%s]" % output
|
||||
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
|
||||
"THREE")
|
||||
assert exit_code == 0, "freon run failed with output=[%s]" % output
|
||||
|
||||
|
||||
def teardown():
|
||||
logger.info("Inside teardown")
|
||||
Blockade.blockade_destroy()
|
||||
logger.info("Inside teardown")
|
||||
Blockade.blockade_destroy()
|
||||
|
||||
|
||||
def teardown_module():
|
||||
ClusterUtils.cluster_destroy(FILE)
|
||||
ClusterUtils.cluster_destroy(FILE)
|
||||
|
||||
|
||||
def test_datanode_isolation_one_node():
|
||||
"""
|
||||
In this test, one of the datanodes (first datanode) cannot communicate
|
||||
with other two datanodes.
|
||||
All datanodes can communicate with SCM.
|
||||
Expectation :
|
||||
The container replica state in first datanode should be quasi-closed.
|
||||
The container replica state in other datanodes should be closed.
|
||||
"""
|
||||
first_set = [OM[0], SCM[0], DATANODES[0]]
|
||||
second_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]]
|
||||
Blockade.blockade_create_partition(first_set, second_set)
|
||||
def test_isolatedatanode_singlenode(run_second_phase):
|
||||
"""
|
||||
In this test, one of the datanodes (first datanode) cannot communicate
|
||||
with other two datanodes.
|
||||
All datanodes can communicate with SCM.
|
||||
Expectation :
|
||||
The container replica state in first datanode should be quasi-closed.
|
||||
The container replica state in other datanodes should be closed.
|
||||
"""
|
||||
first_set = [OM[0], SCM[0], DATANODES[0]]
|
||||
second_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]]
|
||||
Blockade.blockade_create_partition(first_set, second_set)
|
||||
Blockade.blockade_status()
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
logger.info("Waiting for %s seconds before checking container status",
|
||||
os.environ["CONTAINER_STATUS_SLEEP"])
|
||||
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
|
||||
all_datanodes_container_status = \
|
||||
ClusterUtils.findall_container_status(FILE, SCALE)
|
||||
first_datanode_status = all_datanodes_container_status[0]
|
||||
closed_container_datanodes = [x for x in all_datanodes_container_status
|
||||
if x == 'CLOSED']
|
||||
assert first_datanode_status == 'QUASI_CLOSED'
|
||||
assert len(closed_container_datanodes) == 2, \
|
||||
"The container should have two closed replicas."
|
||||
|
||||
if str(run_second_phase).lower() == "true":
|
||||
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
|
||||
Blockade.blockade_status()
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
logger.info("Waiting for %s seconds before checking container status",
|
||||
os.environ["CONTAINER_STATUS_SLEEP"])
|
||||
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
|
||||
all_datanodes_container_status = \
|
||||
ClusterUtils.find_all_datanodes_container_status(FILE, SCALE)
|
||||
first_datanode_status = all_datanodes_container_status[0]
|
||||
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED',
|
||||
all_datanodes_container_status)
|
||||
assert first_datanode_status == 'QUASI_CLOSED'
|
||||
assert len(count_closed_container_datanodes) == 2, \
|
||||
"The container should have three closed replicas."
|
||||
|
||||
|
||||
def test_datanode_isolation_all():
|
||||
"""
|
||||
In this test, none of the datanodes can communicate with other two
|
||||
datanodes.
|
||||
All datanodes can communicate with SCM.
|
||||
Expectation : The container should eventually have at least two closed
|
||||
replicas.
|
||||
"""
|
||||
first_set = [OM[0], SCM[0], DATANODES[0]]
|
||||
second_set = [OM[0], SCM[0], DATANODES[1]]
|
||||
third_set = [OM[0], SCM[0], DATANODES[2]]
|
||||
Blockade.blockade_create_partition(first_set, second_set, third_set)
|
||||
ClusterUtils.findall_container_status(FILE, INCREASED_SCALE)
|
||||
closed_container_datanodes = [x for x in all_datanodes_container_status
|
||||
if x == 'CLOSED']
|
||||
assert len(closed_container_datanodes) >= 3, \
|
||||
"The container should have at least three closed replicas."
|
||||
Blockade.blockade_join()
|
||||
Blockade.blockade_status()
|
||||
_, output = \
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
assert re.search("Status: Success", output) is not None
|
||||
|
||||
|
||||
def test_datanode_isolation_all(run_second_phase):
|
||||
"""
|
||||
In this test, none of the datanodes can communicate with other two
|
||||
datanodes.
|
||||
All datanodes can communicate with SCM.
|
||||
Expectation : The container should eventually have at least two closed
|
||||
replicas.
|
||||
"""
|
||||
first_set = [OM[0], SCM[0], DATANODES[0]]
|
||||
second_set = [OM[0], SCM[0], DATANODES[1]]
|
||||
third_set = [OM[0], SCM[0], DATANODES[2]]
|
||||
Blockade.blockade_create_partition(first_set, second_set, third_set)
|
||||
Blockade.blockade_status()
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
logger.info("Waiting for %s seconds before checking container status",
|
||||
os.environ["CONTAINER_STATUS_SLEEP"])
|
||||
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
|
||||
all_datanodes_container_status = \
|
||||
ClusterUtils.findall_container_status(FILE, SCALE)
|
||||
closed_container_datanodes = [x for x in all_datanodes_container_status
|
||||
if x == 'CLOSED']
|
||||
assert len(closed_container_datanodes) >= 2, \
|
||||
"The container should have at least two closed replicas."
|
||||
|
||||
if str(run_second_phase).lower() == "true":
|
||||
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
|
||||
Blockade.blockade_status()
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
logger.info("Waiting for %s seconds before checking container status",
|
||||
os.environ["CONTAINER_STATUS_SLEEP"])
|
||||
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
|
||||
all_datanodes_container_status = \
|
||||
ClusterUtils.find_all_datanodes_container_status(FILE, SCALE)
|
||||
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED',
|
||||
all_datanodes_container_status)
|
||||
assert len(count_closed_container_datanodes) >= 2, \
|
||||
"The container should have at least two closed replicas."
|
||||
ClusterUtils.findall_container_status(FILE, INCREASED_SCALE)
|
||||
closed_container_datanodes = [x for x in all_datanodes_container_status
|
||||
if x == 'CLOSED']
|
||||
assert len(closed_container_datanodes) >= 3, \
|
||||
"The container should have at least three closed replicas."
|
||||
Blockade.blockade_join()
|
||||
Blockade.blockade_status()
|
||||
_, output = \
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
assert re.search("Status: Success", output) is not None
|
|
@ -17,17 +17,18 @@
|
|||
|
||||
import os
|
||||
import time
|
||||
import re
|
||||
import logging
|
||||
from blockadeUtils.blockade import Blockade
|
||||
from clusterUtils.cluster_utils import ClusterUtils
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
|
||||
FILE = os.path.join(parent_dir, "compose", "ozoneblockade",
|
||||
"docker-compose.yaml")
|
||||
os.environ["DOCKER_COMPOSE_FILE"] = FILE
|
||||
SCALE = 3
|
||||
INCREASED_SCALE = 5
|
||||
CONTAINER_LIST = []
|
||||
OM = []
|
||||
SCM = []
|
||||
|
@ -35,77 +36,123 @@ DATANODES = []
|
|||
|
||||
|
||||
def setup():
|
||||
global CONTAINER_LIST, OM, SCM, DATANODES
|
||||
Blockade.blockade_destroy()
|
||||
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
|
||||
exit_code, output = Blockade.blockade_status()
|
||||
assert exit_code == 0, "blockade status command failed with output=[%s]" % \
|
||||
output
|
||||
OM = filter(lambda x: 'ozoneManager' in x, CONTAINER_LIST)
|
||||
SCM = filter(lambda x: 'scm' in x, CONTAINER_LIST)
|
||||
DATANODES = sorted(list(filter(lambda x: 'datanode' in x, CONTAINER_LIST)))
|
||||
global CONTAINER_LIST, OM, SCM, DATANODES
|
||||
Blockade.blockade_destroy()
|
||||
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
|
||||
exit_code, output = Blockade.blockade_status()
|
||||
assert exit_code == 0, "blockade status command failed with output=[%s]" % \
|
||||
output
|
||||
OM = [x for x in CONTAINER_LIST if 'ozoneManager' in x]
|
||||
SCM = [x for x in CONTAINER_LIST if 'scm' in x]
|
||||
DATANODES = sorted(x for x in CONTAINER_LIST if 'datanode' in x)
|
||||
|
||||
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
|
||||
"THREE")
|
||||
assert exit_code == 0, "freon run failed with output=[%s]" % output
|
||||
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
|
||||
"THREE")
|
||||
assert exit_code == 0, "freon run failed with output=[%s]" % output
|
||||
|
||||
|
||||
def teardown():
|
||||
logger.info("Inside teardown")
|
||||
Blockade.blockade_destroy()
|
||||
logger.info("Inside teardown")
|
||||
Blockade.blockade_destroy()
|
||||
|
||||
|
||||
def teardown_module():
|
||||
ClusterUtils.cluster_destroy(FILE)
|
||||
ClusterUtils.cluster_destroy(FILE)
|
||||
|
||||
|
||||
def test_scm_isolation_one_node():
|
||||
"""
|
||||
In this test, one of the datanodes cannot communicate with SCM.
|
||||
Other datanodes can communicate with SCM.
|
||||
Expectation : The container should eventually have at least two closed
|
||||
replicas.
|
||||
"""
|
||||
first_set = [OM[0], DATANODES[0], DATANODES[1], DATANODES[2]]
|
||||
second_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]]
|
||||
Blockade.blockade_create_partition(first_set, second_set)
|
||||
def test_scm_isolation_one_node(run_second_phase):
|
||||
"""
|
||||
In this test, one of the datanodes cannot communicate with SCM.
|
||||
Other datanodes can communicate with SCM.
|
||||
Expectation : The container should eventually have at least two closed
|
||||
replicas.
|
||||
"""
|
||||
first_set = [OM[0], DATANODES[0], DATANODES[1], DATANODES[2]]
|
||||
second_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]]
|
||||
Blockade.blockade_create_partition(first_set, second_set)
|
||||
Blockade.blockade_status()
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
logger.info("Waiting for %s seconds before checking container status",
|
||||
os.environ["CONTAINER_STATUS_SLEEP"])
|
||||
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
|
||||
all_datanodes_container_status = \
|
||||
ClusterUtils.findall_container_status(FILE, SCALE)
|
||||
closed_container_datanodes = [x for x in all_datanodes_container_status
|
||||
if x == 'CLOSED']
|
||||
assert len(closed_container_datanodes) >= 2, \
|
||||
"The container should have at least two closed replicas."
|
||||
|
||||
if str(run_second_phase).lower() == "true":
|
||||
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
|
||||
Blockade.blockade_status()
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
logger.info("Waiting for %s seconds before checking container status",
|
||||
os.environ["CONTAINER_STATUS_SLEEP"])
|
||||
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
|
||||
all_datanodes_container_status = \
|
||||
ClusterUtils.find_all_datanodes_container_status(FILE, SCALE)
|
||||
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED',
|
||||
all_datanodes_container_status)
|
||||
assert len(count_closed_container_datanodes) >= 2, \
|
||||
"The container should have at least two closed replicas."
|
||||
|
||||
|
||||
def test_scm_isolation_two_node():
|
||||
"""
|
||||
In this test, two datanodes cannot communicate with SCM.
|
||||
Expectation : The container should eventually have at three closed replicas
|
||||
or, two open replicas and one quasi-closed replica.
|
||||
"""
|
||||
first_set = [OM[0], DATANODES[0], DATANODES[1], DATANODES[2]]
|
||||
second_set = [OM[0], SCM[0], DATANODES[1]]
|
||||
Blockade.blockade_create_partition(first_set, second_set)
|
||||
ClusterUtils.findall_container_status(FILE, INCREASED_SCALE)
|
||||
closed_container_datanodes = [x for x in all_datanodes_container_status
|
||||
if x == 'CLOSED']
|
||||
assert len(closed_container_datanodes) >= 3, \
|
||||
"The container should have at least three closed replicas."
|
||||
Blockade.blockade_join()
|
||||
Blockade.blockade_status()
|
||||
_, output = \
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
assert re.search("Status: Success", output) is not None
|
||||
|
||||
|
||||
def test_scm_isolation_two_node(run_second_phase):
|
||||
"""
|
||||
In this test, two datanodes cannot communicate with SCM.
|
||||
Expectation : The container should eventually have at three closed replicas
|
||||
or, two open replicas and one quasi-closed replica.
|
||||
"""
|
||||
first_set = [OM[0], DATANODES[0], DATANODES[1], DATANODES[2]]
|
||||
second_set = [OM[0], SCM[0], DATANODES[1]]
|
||||
Blockade.blockade_create_partition(first_set, second_set)
|
||||
Blockade.blockade_status()
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
logger.info("Waiting for %s seconds before checking container status",
|
||||
os.environ["CONTAINER_STATUS_SLEEP"])
|
||||
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
|
||||
all_datanodes_container_status = \
|
||||
ClusterUtils.findall_container_status(FILE, SCALE)
|
||||
closed_container_datanodes = [x for x in all_datanodes_container_status
|
||||
if x == 'CLOSED']
|
||||
qausiclosed_container_datanodes = [x for x in all_datanodes_container_status
|
||||
if x == 'QUASI_CLOSED']
|
||||
count_open_container_datanodes = [x for x in all_datanodes_container_status
|
||||
if x == 'OPEN']
|
||||
assert len(closed_container_datanodes) == 3 or \
|
||||
(len(count_open_container_datanodes) == 2 and
|
||||
len(qausiclosed_container_datanodes) == 1), \
|
||||
"The container should have three closed replicas or two open " \
|
||||
"replicas and one quasi_closed replica."
|
||||
|
||||
if str(run_second_phase).lower() == "true":
|
||||
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
|
||||
Blockade.blockade_status()
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
logger.info("Waiting for %s seconds before checking container status",
|
||||
os.environ["CONTAINER_STATUS_SLEEP"])
|
||||
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
|
||||
all_datanodes_container_status = \
|
||||
ClusterUtils.find_all_datanodes_container_status(FILE, SCALE)
|
||||
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED',
|
||||
all_datanodes_container_status)
|
||||
count_qausi_closed_container_datanodes = \
|
||||
filter(lambda x: x == 'QUASI_CLOSED', all_datanodes_container_status)
|
||||
count_open_container_datanodes = filter(lambda x: x == 'OPEN',
|
||||
all_datanodes_container_status)
|
||||
assert len(count_closed_container_datanodes) == 3 or \
|
||||
(len(count_open_container_datanodes) == 2 and
|
||||
len(count_qausi_closed_container_datanodes) == 1), \
|
||||
"The container should have three closed replicas or two open " \
|
||||
"replicas and one quasi_closed replica."
|
||||
ClusterUtils.findall_container_status(FILE, INCREASED_SCALE)
|
||||
closed_container_datanodes = [x for x in all_datanodes_container_status
|
||||
if x == 'CLOSED']
|
||||
qausiclosed_container_datanodes = \
|
||||
[x for x in all_datanodes_container_status if x == 'QUASI_CLOSED']
|
||||
assert len(closed_container_datanodes) >= 3 or \
|
||||
len(qausiclosed_container_datanodes) >= 3
|
||||
Blockade.blockade_join()
|
||||
Blockade.blockade_status()
|
||||
if len(closed_container_datanodes) < 3:
|
||||
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
|
||||
all_datanodes_container_status = \
|
||||
ClusterUtils.findall_container_status(FILE, INCREASED_SCALE)
|
||||
closed_container_datanodes = [x for x in all_datanodes_container_status
|
||||
if x == 'CLOSED']
|
||||
|
||||
assert len(closed_container_datanodes) >= 3
|
||||
_, output = \
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
assert re.search("Status: Success", output) is not None
|
Loading…
Reference in New Issue