HDDS-1778. Fix existing blockade tests. (#1068)

This commit is contained in:
Nanda kumar 2019-07-10 22:13:59 +05:30 committed by Arpit Agarwal
parent bbf5844968
commit efb916457f
16 changed files with 1104 additions and 1435 deletions

View File

@ -23,12 +23,15 @@ OZONE-SITE.XML_ozone.scm.block.client.address=scm
OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata
OZONE-SITE.XML_ozone.handler.type=distributed
OZONE-SITE.XML_ozone.scm.client.address=scm
OZONE-SITE.XML_ozone.client.max.retries=10
OZONE-SITE.XML_ozone.scm.stale.node.interval=2m
OZONE-SITE.XML_ozone.scm.dead.node.interval=5m
OZONE-SITE.XML_ozone.replication=1
OZONE-SITE.XML_hdds.datanode.dir=/data/hdds
OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
OZONE-SITE.XML_ozone.scm.pipeline.destroy.timeout=15s
OZONE-SITE.XML_hdds.heartbeat.interval=2s
OZONE-SITE.XML_hdds.scm.wait.time.after.safemode.exit=30s
OZONE-SITE.XML_hdds.scm.replication.thread.interval=5s
OZONE-SITE.XML_hdds.scm.replication.event.timeout=7s
OZONE-SITE.XML_dfs.ratis.server.failure.duration=25s

View File

@ -1,335 +0,0 @@
#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from subprocess import call
import subprocess
import logging
import time
import re
import os
import yaml
logger = logging.getLogger(__name__)
class ClusterUtils(object):
"""
This class contains all cluster related operations.
"""
@classmethod
def cluster_setup(cls, docker_compose_file, datanode_count,
destroy_existing_cluster=True):
"""start a blockade cluster"""
logger.info("compose file :%s", docker_compose_file)
logger.info("number of DNs :%d", datanode_count)
if destroy_existing_cluster:
call(["docker-compose", "-f", docker_compose_file, "down"])
call(["docker-compose", "-f", docker_compose_file, "up", "-d",
"--scale", "datanode=" + str(datanode_count)])
logger.info("Waiting 30s for cluster start up...")
time.sleep(30)
output = subprocess.check_output(["docker-compose", "-f",
docker_compose_file, "ps"])
output_array = output.split("\n")[2:-1]
container_list = []
for out in output_array:
container = out.split(" ")[0]
container_list.append(container)
call(["blockade", "add", container])
time.sleep(2)
assert container_list, "no container found!"
logger.info("blockade created with containers %s",
' '.join(container_list))
return container_list
@classmethod
def cluster_destroy(cls, docker_compose_file):
logger.info("Running docker-compose -f %s down", docker_compose_file)
call(["docker-compose", "-f", docker_compose_file, "down"])
@classmethod
def run_freon(cls, docker_compose_file, num_volumes, num_buckets,
num_keys, key_size, replication_type, replication_factor,
freon_client='om'):
# run freon
cmd = "docker-compose -f %s " \
"exec %s /opt/hadoop/bin/ozone " \
"freon rk " \
"--numOfVolumes %s " \
"--numOfBuckets %s " \
"--numOfKeys %s " \
"--keySize %s " \
"--replicationType %s " \
"--factor %s" % (docker_compose_file, freon_client, num_volumes,
num_buckets, num_keys, key_size,
replication_type, replication_factor)
exit_code, output = cls.run_cmd(cmd)
return exit_code, output
@classmethod
def run_cmd(cls, cmd):
command = cmd
if isinstance(cmd, list):
command = ' '.join(cmd)
logger.info(" RUNNING: %s", command)
all_output = ""
myprocess = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, shell=True)
while myprocess.poll() is None:
op = myprocess.stdout.readline()
if op:
all_output += op
logger.info(op)
other_output = myprocess.communicate()
other_output = other_output[0].strip()
if other_output != "":
all_output += other_output
for each_line in other_output.split("\n"):
logger.info(" %s", each_line.strip())
reg = re.compile(r"(\r\n|\n)$")
all_output = reg.sub("", all_output, 1)
return myprocess.returncode, all_output
@classmethod
def get_ozone_confkey_value(cls, docker_compose_file, key_name):
cmd = "docker-compose -f %s " \
"exec om /opt/hadoop/bin/ozone " \
"getconf -confKey %s" \
% (docker_compose_file, key_name)
exit_code, output = cls.run_cmd(cmd)
assert exit_code == 0, "getconf of key=[%s] failed with output=[%s]" \
% (key_name, output)
return str(output).strip()
@classmethod
def find_scm_uuid(cls, docker_compose_file):
"""
This function returns scm uuid.
"""
ozone_metadata_dir = cls.get_ozone_confkey_value(docker_compose_file,
"ozone.metadata.dirs")
cmd = "docker-compose -f %s exec scm cat %s/scm/current/VERSION" % \
(docker_compose_file, ozone_metadata_dir)
exit_code, output = cls.run_cmd(cmd)
assert exit_code == 0, "get scm UUID failed with output=[%s]" % output
output_list = output.split("\n")
output_list = [x for x in output_list if re.search(r"\w+=\w+", x)]
output_dict = dict(x.split("=") for x in output_list)
return str(output_dict['scmUuid']).strip()
@classmethod
def find_container_status(cls, docker_compose_file, datanode_index):
"""
This function returns the datanode's container replica state.
In this function, it finds all <containerID>.container files.
Then, it opens each file and checks the state of the containers
in the datanode.
It returns 'None' as container state if it cannot find any
<containerID>.container file in the datanode.
Sample <containerID>.container contains state as following:
state: <STATE OF THE CONTAINER REPLICA>
"""
datanode_dir = cls.get_ozone_confkey_value(docker_compose_file,
"hdds.datanode.dir")
scm_uuid = cls.find_scm_uuid(docker_compose_file)
container_parent_path = "%s/hdds/%s/current/containerDir0" % \
(datanode_dir, scm_uuid)
cmd = "docker-compose -f %s exec --index=%s datanode find %s -type f " \
"-name '*.container'" \
% (docker_compose_file, datanode_index, container_parent_path)
exit_code, output = cls.run_cmd(cmd)
container_state = "None"
if exit_code == 0 and output:
container_list = map(str.strip, output.split("\n"))
for container_path in container_list:
cmd = "docker-compose -f %s exec --index=%s datanode cat %s" \
% (docker_compose_file, datanode_index, container_path)
exit_code, output = cls.run_cmd(cmd)
assert exit_code == 0, \
"command=[%s] failed with output=[%s]" % (cmd, output)
container_db_list = output.split("\n")
container_db_list = [x for x in container_db_list
if re.search(r"\w+:\s\w+", x)]
# container_db_list will now contain the lines which has got
# yaml representation , i.e , key: value
container_db_info = "\n".join(container_db_list)
container_db_dict = yaml.load(container_db_info)
for key, value in container_db_dict.items():
container_db_dict[key] = str(value).lstrip()
if container_state == "None":
container_state = container_db_dict['state']
else:
assert container_db_dict['state'] == container_state, \
"all containers are not in same state"
return container_state
@classmethod
def findall_container_status(cls, docker_compose_file, scale):
"""
This function returns container replica states of all datanodes.
"""
all_datanode_container_status = []
for index in range(scale):
all_datanode_container_status.append(
cls.find_container_status(docker_compose_file, index + 1))
logger.info("All datanodes container status: %s",
' '.join(all_datanode_container_status))
return all_datanode_container_status
@classmethod
def create_volume(cls, docker_compose_file, volume_name):
command = "docker-compose -f %s " \
"exec ozone_client /opt/hadoop/bin/ozone " \
"sh volume create /%s --user root" % \
(docker_compose_file, volume_name)
logger.info("Creating Volume %s", volume_name)
exit_code, output = cls.run_cmd(command)
assert exit_code == 0, "Ozone volume create failed with output=[%s]" \
% output
@classmethod
def delete_volume(cls, docker_compose_file, volume_name):
command = "docker-compose -f %s " \
"exec ozone_client /opt/hadoop/bin/ozone " \
"sh volume delete /%s" % (docker_compose_file, volume_name)
logger.info("Deleting Volume %s", volume_name)
exit_code, output = cls.run_cmd(command)
return exit_code, output
@classmethod
def create_bucket(cls, docker_compose_file, bucket_name, volume_name):
command = "docker-compose -f %s " \
"exec ozone_client /opt/hadoop/bin/ozone " \
"sh bucket create /%s/%s" % (docker_compose_file,
volume_name, bucket_name)
logger.info("Creating Bucket %s in volume %s",
bucket_name, volume_name)
exit_code, output = cls.run_cmd(command)
assert exit_code == 0, "Ozone bucket create failed with output=[%s]" \
% output
@classmethod
def delete_bucket(cls, docker_compose_file, bucket_name, volume_name):
command = "docker-compose -f %s " \
"exec ozone_client /opt/hadoop/bin/ozone " \
"sh bucket delete /%s/%s" % (docker_compose_file,
volume_name, bucket_name)
logger.info("Running delete bucket of %s/%s", volume_name, bucket_name)
exit_code, output = cls.run_cmd(command)
return exit_code, output
@classmethod
def put_key(cls, docker_compose_file, bucket_name, volume_name,
filepath, key_name=None, replication_factor=None):
command = "docker-compose -f %s " \
"exec ozone_client ls %s" % (docker_compose_file, filepath)
exit_code, output = cls.run_cmd(command)
assert exit_code == 0, "%s does not exist" % filepath
if key_name is None:
key_name = os.path.basename(filepath)
command = "docker-compose -f %s " \
"exec ozone_client /opt/hadoop/bin/ozone " \
"sh key put /%s/%s/%s %s" % (docker_compose_file,
volume_name, bucket_name,
key_name, filepath)
if replication_factor:
command = "%s --replication=%s" % (command, replication_factor)
logger.info("Creating key %s in %s/%s", key_name,
volume_name, bucket_name)
exit_code, output = cls.run_cmd(command)
assert exit_code == 0, "Ozone put Key failed with output=[%s]" % output
@classmethod
def delete_key(cls, docker_compose_file, bucket_name, volume_name,
key_name):
command = "docker-compose -f %s " \
"exec ozone_client /opt/hadoop/bin/ozone " \
"sh key delete /%s/%s/%s" \
% (docker_compose_file, volume_name, bucket_name, key_name)
logger.info("Running delete key %s in %s/%s",
key_name, volume_name, bucket_name)
exit_code, output = cls.run_cmd(command)
return exit_code, output
@classmethod
def get_key(cls, docker_compose_file, bucket_name, volume_name,
key_name, filepath=None):
if filepath is None:
filepath = '.'
command = "docker-compose -f %s " \
"exec ozone_client /opt/hadoop/bin/ozone " \
"sh key get /%s/%s/%s %s" % (docker_compose_file,
volume_name, bucket_name,
key_name, filepath)
logger.info("Running get key %s in %s/%s", key_name,
volume_name, bucket_name)
exit_code, output = cls.run_cmd(command)
assert exit_code == 0, "Ozone get Key failed with output=[%s]" % output
@classmethod
def find_checksum(cls, docker_compose_file, filepath, client="ozone_client"):
"""
This function finds the checksum of a file present in a docker container.
Before running any 'putKey' operation, this function is called to store
the original checksum of the file. The file is then uploaded as a key.
"""
command = "docker-compose -f %s " \
"exec %s md5sum %s" % \
(docker_compose_file, client, filepath)
exit_code, output = cls.run_cmd(command)
assert exit_code == 0, "Cant find checksum"
myoutput = output.split("\n")
finaloutput = ""
for line in myoutput:
if line.find("Warning") >= 0 or line.find("is not a tty") >= 0:
logger.info("skip this line: %s", line)
else:
finaloutput = finaloutput + line
checksum = finaloutput.split(" ")
logger.info("Checksum of %s is : %s", filepath, checksum[0])
return checksum[0]
@classmethod
def get_pipelines(cls, docker_compose_file):
command = "docker-compose -f %s " \
+ "exec ozone_client /opt/hadoop/bin/ozone scmcli " \
+ "listPipelines" % (docker_compose_file)
exit_code, output = cls.run_cmd(command)
assert exit_code == 0, "list pipeline command failed"
return output
@classmethod
def find_om_scm_client_datanodes(cls, container_list):
om = filter(lambda x: 'om_1' in x, container_list)
scm = filter(lambda x: 'scm' in x, container_list)
datanodes = sorted(
list(filter(lambda x: 'datanode' in x, container_list)))
client = filter(lambda x: 'ozone_client' in x, container_list)
return om, scm, client, datanodes

View File

@ -20,7 +20,6 @@
from subprocess import call
import logging
import util
from clusterUtils.cluster_utils import ClusterUtils
logger = logging.getLogger(__name__)
@ -34,12 +33,13 @@ class Blockade(object):
@classmethod
def blockade_up(cls):
logger.info("Running blockade up")
call(["blockade", "up"])
@classmethod
def blockade_status(cls):
exit_code, output = util.run_cmd("blockade status")
return exit_code, output
logger.info("Running blockade status")
return call(["blockade", "status"])
@classmethod
def make_flaky(cls, flaky_node):
@ -58,15 +58,15 @@ class Blockade(object):
for node_list in args:
nodes = nodes + ','.join(node_list) + " "
exit_code, output = \
util.run_cmd("blockade partition %s" % nodes)
util.run_command("blockade partition %s" % nodes)
assert exit_code == 0, \
"blockade partition command failed with exit code=[%s]" % output
@classmethod
def blockade_join(cls):
output = call(["blockade", "join"])
assert output == 0, "blockade join command failed with exit code=[%s]" \
% output
exit_code = call(["blockade", "join"])
assert exit_code == 0, "blockade join command failed with exit code=[%s]" \
% exit_code
@classmethod
def blockade_stop(cls, node, all_nodes=False):
@ -89,4 +89,4 @@ class Blockade(object):
@classmethod
def blockade_add(cls, node):
output = call(["blockade", "add", node])
assert output == 0, "blockade add command failed"
assert output == 0, "blockade add command failed"

View File

@ -0,0 +1,75 @@
#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from ozone import util
from ozone.cluster import Command
class OzoneClient:
__logger__ = logging.getLogger(__name__)
def __init__(self, cluster):
self.cluster = cluster
pass
def create_volume(self, volume_name):
OzoneClient.__logger__.info("Creating Volume %s" % volume_name)
command = [Command.ozone, "sh volume create /%s --user root" % volume_name]
util.run_docker_command(command, self.cluster.client)
def create_bucket(self, volume_name, bucket_name):
OzoneClient.__logger__.info("Creating Bucket %s in Volume %s" % (bucket_name, volume_name))
command = [Command.ozone, "sh bucket create /%s/%s" % (volume_name, bucket_name)]
util.run_docker_command(command, self.cluster.client)
def put_key(self, source_file, volume_name, bucket_name, key_name, replication_factor=None):
OzoneClient.__logger__.info("Creating Key %s in %s/%s" % (key_name, volume_name, bucket_name))
exit_code, output = util.run_docker_command(
"ls %s" % source_file, self.cluster.client)
assert exit_code == 0, "%s does not exist" % source_file
command = [Command.ozone, "sh key put /%s/%s/%s %s" %
(volume_name, bucket_name, key_name, source_file)]
if replication_factor:
command.append("--replication=%s" % replication_factor)
exit_code, output = util.run_docker_command(command, self.cluster.client)
assert exit_code == 0, "Ozone put Key failed with output=[%s]" % output
def get_key(self, volume_name, bucket_name, key_name, file_path='.'):
OzoneClient.__logger__.info("Reading key %s from %s/%s" % (key_name, volume_name, bucket_name))
command = [Command.ozone, "sh key get /%s/%s/%s %s" %
(volume_name, bucket_name, key_name, file_path)]
exit_code, output = util.run_docker_command(command, self.cluster.client)
assert exit_code == 0, "Ozone get Key failed with output=[%s]" % output
def run_freon(self, num_volumes, num_buckets, num_keys, key_size,
replication_type="RATIS", replication_factor="THREE"):
"""
Runs freon on the cluster.
"""
command = [Command.freon,
" rk",
" --numOfVolumes " + str(num_volumes),
" --numOfBuckets " + str(num_buckets),
" --numOfKeys " + str(num_keys),
" --keySize " + str(key_size),
" --replicationType " + replication_type,
" --factor " + replication_factor]
return util.run_docker_command(command, self.cluster.client)

View File

@ -20,284 +20,272 @@ import os
import re
import subprocess
import yaml
import util
from os import environ
from subprocess import call
from blockadeUtils.blockade import Blockade
class Command(object):
docker = "docker"
blockade = "blockade"
docker_compose = "docker-compose"
ozone = "/opt/hadoop/bin/ozone"
freon = "/opt/hadoop/bin/ozone freon"
from ozone import util
from ozone.constants import Command
from ozone.blockade import Blockade
from ozone.client import OzoneClient
from ozone.container import Container
from ozone.exceptions import ContainerNotFoundError
class Configuration:
"""
Configurations to be used while starting Ozone Cluster.
Here @property decorators is used to achieve getters, setters and delete
behaviour for 'datanode_count' attribute.
@datanode_count.setter will set the value for 'datanode_count' attribute.
@datanode_count.deleter will delete the current value of 'datanode_count'
attribute.
"""
def __init__(self):
if "MAVEN_TEST" in os.environ:
compose_dir = environ.get("MAVEN_TEST")
self.docker_compose_file = os.path.join(compose_dir, "docker-compose.yaml")
elif "OZONE_HOME" in os.environ:
compose_dir = os.path.join(environ.get("OZONE_HOME"), "compose", "ozoneblockade")
self.docker_compose_file = os.path.join(compose_dir, "docker-compose.yaml")
else:
__parent_dir__ = os.path.dirname(os.path.dirname(os.path.dirname(
os.path.dirname(os.path.realpath(__file__)))))
self.docker_compose_file = os.path.join(__parent_dir__,
"compose", "ozoneblockade",
"docker-compose.yaml")
self._datanode_count = 3
os.environ["DOCKER_COMPOSE_FILE"] = self.docker_compose_file
@property
def datanode_count(self):
return self._datanode_count
@datanode_count.setter
def datanode_count(self, datanode_count):
self._datanode_count = datanode_count
@datanode_count.deleter
def datanode_count(self):
del self._datanode_count
class Cluster(object):
"""
This represents Ozone Cluster.
Here @property decorators is used to achieve getters, setters and delete
behaviour for 'om', 'scm', 'datanodes' and 'clients' attributes.
"""
__logger__ = logging.getLogger(__name__)
def __init__(self, conf):
self.conf = conf
self.docker_compose_file = conf.docker_compose_file
self._om = None
self._scm = None
self._datanodes = None
self._clients = None
self.scm_uuid = None
self.datanode_dir = None
@property
def om(self):
return self._om
@om.setter
def om(self, om):
self._om = om
@om.deleter
def om(self):
del self._om
@property
def scm(self):
return self._scm
@scm.setter
def scm(self, scm):
self._scm = scm
@scm.deleter
def scm(self):
del self._scm
@property
def datanodes(self):
return self._datanodes
@datanodes.setter
def datanodes(self, datanodes):
self._datanodes = datanodes
@datanodes.deleter
def datanodes(self):
del self._datanodes
@property
def clients(self):
return self._clients
@clients.setter
def clients(self, clients):
self._clients = clients
@clients.deleter
def clients(self):
del self._clients
@classmethod
def create(cls, config=Configuration()):
return Cluster(config)
def start(self):
"""
Start Ozone Cluster in docker containers.
Configurations to be used while starting Ozone Cluster.
Here @property decorators is used to achieve getters, setters and delete
behaviour for 'datanode_count' attribute.
@datanode_count.setter will set the value for 'datanode_count' attribute.
@datanode_count.deleter will delete the current value of 'datanode_count'
attribute.
"""
Cluster.__logger__.info("Starting Ozone Cluster")
Blockade.blockade_destroy()
call([Command.docker_compose, "-f", self.docker_compose_file,
"up", "-d", "--scale",
"datanode=" + str(self.conf.datanode_count)])
Cluster.__logger__.info("Waiting 10s for cluster start up...")
# Remove the sleep and wait only till the cluster is out of safemode
# time.sleep(10)
output = subprocess.check_output([Command.docker_compose, "-f",
self.docker_compose_file, "ps"])
node_list = []
for out in output.split("\n")[2:-1]:
node = out.split(" ")[0]
node_list.append(node)
Blockade.blockade_add(node)
Blockade.blockade_status()
self.om = filter(lambda x: 'om' in x, node_list)[0]
self.scm = filter(lambda x: 'scm' in x, node_list)[0]
self.datanodes = sorted(list(filter(lambda x: 'datanode' in x, node_list)))
self.clients = filter(lambda x: 'ozone_client' in x, node_list)
self.scm_uuid = self.__get_scm_uuid__()
self.datanode_dir = self.get_conf_value("hdds.datanode.dir")
def __init__(self):
if "MAVEN_TEST" in os.environ:
compose_dir = environ.get("MAVEN_TEST")
self.docker_compose_file = os.path.join(compose_dir, "docker-compose.yaml")
elif "OZONE_HOME" in os.environ:
compose_dir = os.path.join(environ.get("OZONE_HOME"), "compose", "ozoneblockade")
self.docker_compose_file = os.path.join(compose_dir, "docker-compose.yaml")
else:
__parent_dir__ = os.path.dirname(os.path.dirname(os.path.dirname(
os.path.dirname(os.path.realpath(__file__)))))
self.docker_compose_file = os.path.join(__parent_dir__,
"compose", "ozoneblockade",
"docker-compose.yaml")
self._datanode_count = 3
os.environ["DOCKER_COMPOSE_FILE"] = self.docker_compose_file
assert node_list, "no node found in the cluster!"
Cluster.__logger__.info("blockade created with nodes %s",
' '.join(node_list))
@property
def datanode_count(self):
return self._datanode_count
def get_conf_value(self, key):
"""
Returns the value of given configuration key.
"""
command = [Command.ozone, "getconf -confKey " + key]
exit_code, output = self.__run_docker_command__(command, self.om)
return str(output).strip()
@datanode_count.setter
def datanode_count(self, datanode_count):
self._datanode_count = datanode_count
def scale_datanode(self, datanode_count):
"""
Commission new datanodes to the running cluster.
"""
call([Command.docker_compose, "-f", self.docker_compose_file,
"up", "-d", "--scale", "datanode=" + datanode_count])
def partition_network(self, *args):
"""
Partition the network which is used by the cluster.
"""
Blockade.blockade_create_partition(*args)
@datanode_count.deleter
def datanode_count(self):
del self._datanode_count
def restore_network(self):
class OzoneCluster(object):
"""
Restores the network partition.
This represents Ozone Cluster.
Here @property decorators is used to achieve getters, setters and delete
behaviour for 'om', 'scm', 'datanodes' and 'client' attributes.
"""
Blockade.blockade_join()
__logger__ = logging.getLogger(__name__)
def __get_scm_uuid__(self):
"""
Returns SCM's UUID.
"""
ozone_metadata_dir = self.get_conf_value("ozone.metadata.dirs")
command = "cat %s/scm/current/VERSION" % ozone_metadata_dir
exit_code, output = self.__run_docker_command__(command, self.scm)
output_list = output.split("\n")
key_value = [x for x in output_list if re.search(r"\w+=\w+", x)]
uuid = [token for token in key_value if 'scmUuid' in token]
return uuid.pop().split("=")[1].strip()
def __init__(self, conf):
self.conf = conf
self.docker_compose_file = conf.docker_compose_file
self._om = None
self._scm = None
self._datanodes = None
self._client = None
self.scm_uuid = None
self.datanode_dir = None
def get_container_states(self, datanode):
"""
Returns the state of all the containers in the given datanode.
"""
container_parent_path = "%s/hdds/%s/current/containerDir0" % \
(self.datanode_dir, self.scm_uuid)
command = "find %s -type f -name '*.container'" % container_parent_path
exit_code, output = self.__run_docker_command__(command, datanode)
container_state = {}
@property
def om(self):
return self._om
container_list = map(str.strip, output.split("\n"))
for container_path in container_list:
# Reading the container file.
exit_code, output = self.__run_docker_command__(
"cat " + container_path, datanode)
if exit_code is not 0:
continue
data = output.split("\n")
# Reading key value pairs from container file.
key_value = [x for x in data if re.search(r"\w+:\s\w+", x)]
content = "\n".join(key_value)
content_yaml = yaml.load(content)
if content_yaml is None:
continue
for key, value in content_yaml.items():
content_yaml[key] = str(value).lstrip()
# Stores the container state in a dictionary.
container_state[content_yaml['containerID']] = content_yaml['state']
return container_state
@om.setter
def om(self, om):
self._om = om
def run_freon(self, num_volumes, num_buckets, num_keys, key_size,
replication_type="RATIS", replication_factor="THREE",
run_on=None):
"""
Runs freon on the cluster.
"""
if run_on is None:
run_on = self.om
command = [Command.freon,
" rk",
" --numOfVolumes " + str(num_volumes),
" --numOfBuckets " + str(num_buckets),
" --numOfKeys " + str(num_keys),
" --keySize " + str(key_size),
" --replicationType " + replication_type,
" --factor " + replication_factor]
return self.__run_docker_command__(command, run_on)
@om.deleter
def om(self):
del self._om
def __run_docker_command__(self, command, run_on):
if isinstance(command, list):
command = ' '.join(command)
command = [Command.docker,
"exec " + run_on,
command]
return util.run_cmd(command)
@property
def scm(self):
return self._scm
def stop(self):
"""
Stops the Ozone Cluster.
"""
Cluster.__logger__.info("Stopping Ozone Cluster")
call([Command.docker_compose, "-f", self.docker_compose_file, "down"])
Blockade.blockade_destroy()
@scm.setter
def scm(self, scm):
self._scm = scm
def container_state_predicate_all_closed(self, datanodes):
for datanode in datanodes:
container_states_dn = self.get_container_states(datanode)
if not container_states_dn \
or container_states_dn.popitem()[1] != 'CLOSED':
return False
return True
@scm.deleter
def scm(self):
del self._scm
def container_state_predicate_one_closed(self, datanodes):
for datanode in datanodes:
container_states_dn = self.get_container_states(datanode)
if container_states_dn and container_states_dn.popitem()[1] == 'CLOSED':
return True
return False
@property
def datanodes(self):
return self._datanodes
@datanodes.setter
def datanodes(self, datanodes):
self._datanodes = datanodes
@datanodes.deleter
def datanodes(self):
del self._datanodes
@property
def client(self):
return self._client
@client.setter
def client(self, client):
self._client = client
@client.deleter
def client(self):
del self._client
@classmethod
def create(cls, config=Configuration()):
return OzoneCluster(config)
def start(self):
"""
Start Ozone Cluster in docker containers.
"""
self.__logger__.info("Starting Ozone Cluster")
if Blockade.blockade_status() == 0:
Blockade.blockade_destroy()
Blockade.blockade_up()
call([Command.docker_compose, "-f", self.docker_compose_file,
"up", "-d", "--scale",
"datanode=" + str(self.conf.datanode_count)])
self.__logger__.info("Waiting 10s for cluster start up...")
# Remove the sleep and wait only till the cluster is out of safemode
# time.sleep(10)
output = subprocess.check_output([Command.docker_compose, "-f",
self.docker_compose_file, "ps"])
node_list = []
for out in output.split("\n")[2:-1]:
node = out.split(" ")[0]
node_list.append(node)
Blockade.blockade_add(node)
self.om = filter(lambda x: 'om' in x, node_list)[0]
self.scm = filter(lambda x: 'scm' in x, node_list)[0]
self.datanodes = sorted(list(filter(lambda x: 'datanode' in x, node_list)))
self.client = filter(lambda x: 'ozone_client' in x, node_list)[0]
self.scm_uuid = self.__get_scm_uuid__()
self.datanode_dir = self.get_conf_value("hdds.datanode.dir")
assert node_list, "no node found in the cluster!"
self.__logger__.info("blockade created with nodes %s", ' '.join(node_list))
def get_conf_value(self, key):
"""
Returns the value of given configuration key.
"""
command = [Command.ozone, "getconf -confKey " + key]
exit_code, output = util.run_docker_command(command, self.om)
return str(output).strip()
def scale_datanode(self, datanode_count):
"""
Commission new datanodes to the running cluster.
"""
call([Command.docker_compose, "-f", self.docker_compose_file,
"up", "-d", "--scale", "datanode=" + datanode_count])
def partition_network(self, *args):
"""
Partition the network which is used by the cluster.
"""
Blockade.blockade_create_partition(*args)
def restore_network(self):
"""
Restores the network partition.
"""
Blockade.blockade_join()
def __get_scm_uuid__(self):
"""
Returns SCM's UUID.
"""
ozone_metadata_dir = self.get_conf_value("ozone.metadata.dirs")
command = "cat %s/scm/current/VERSION" % ozone_metadata_dir
exit_code, output = util.run_docker_command(command, self.scm)
output_list = output.split("\n")
key_value = [x for x in output_list if re.search(r"\w+=\w+", x)]
uuid = [token for token in key_value if 'scmUuid' in token]
return uuid.pop().split("=")[1].strip()
def get_client(self):
return OzoneClient(self)
def get_container(self, container_id):
command = [Command.ozone, "scmcli list -c=1 -s=%s | grep containerID", container_id - 1]
exit_code, output = util.run_docker_command(command, self.om)
if exit_code != 0:
raise ContainerNotFoundError(container_id)
return Container(container_id, self)
def get_containers_on_datanode(self, datanode):
"""
Returns all the container on given datanode.
"""
container_parent_path = "%s/hdds/%s/current/containerDir0" % \
(self.datanode_dir, self.scm_uuid)
command = "find %s -type f -name '*.container'" % container_parent_path
exit_code, output = util.run_docker_command(command, datanode)
containers = []
container_list = map(str.strip, output.split("\n"))
for container_path in container_list:
# Reading the container file.
exit_code, output = util.run_docker_command(
"cat " + container_path, datanode)
if exit_code is not 0:
continue
data = output.split("\n")
# Reading key value pairs from container file.
key_value = [x for x in data if re.search(r"\w+:\s\w+", x)]
content = "\n".join(key_value)
content_yaml = yaml.load(content)
if content_yaml is None:
continue
containers.append(Container(content_yaml.get('containerID'), self))
return containers
def get_container_state(self, container_id, datanode):
container_parent_path = "%s/hdds/%s/current/containerDir0" % \
(self.datanode_dir, self.scm_uuid)
command = "find %s -type f -name '%s.container'" % (container_parent_path, container_id)
exit_code, output = util.run_docker_command(command, datanode)
container_path = output.strip()
if not container_path:
raise ContainerNotFoundError("Container not found!")
# Reading the container file.
exit_code, output = util.run_docker_command("cat " + container_path, datanode)
data = output.split("\n")
# Reading key value pairs from container file.
key_value = [x for x in data if re.search(r"\w+:\s\w+", x)]
content = "\n".join(key_value)
content_yaml = yaml.load(content)
return str(content_yaml.get('state')).lstrip()
def get_container_datanodes(self, container_id):
result = []
for datanode in self.datanodes:
container_parent_path = "%s/hdds/%s/current/containerDir0" % \
(self.datanode_dir, self.scm_uuid)
command = "find %s -type f -name '%s.container'" % (container_parent_path, container_id)
exit_code, output = util.run_docker_command(command, datanode)
if exit_code == 0:
result.append(datanode)
return result
def stop(self):
"""
Stops the Ozone Cluster.
"""
self.__logger__.info("Stopping Ozone Cluster")
call([Command.docker_compose, "-f", self.docker_compose_file, "down"])
Blockade.blockade_destroy()
def container_state_predicate(self, datanode, state):
container_states_dn = self.get_container_states(datanode)
if container_states_dn and container_states_dn.popitem()[1] == state:
return True
return False

View File

@ -1,3 +1,5 @@
#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@ -11,4 +13,11 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# limitations under the License.
class Command(object):
docker = "docker"
docker_compose = "docker-compose"
ozone = "/opt/hadoop/bin/ozone"
freon = "/opt/hadoop/bin/ozone freon"

View File

@ -0,0 +1,117 @@
#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import util
from ozone.exceptions import ContainerNotFoundError
class Container:
def __init__(self, container_id, cluster):
self.container_id = container_id
self.cluster = cluster
def get_datanode_states(self):
dns = self.cluster.get_container_datanodes(self.container_id)
states = []
for dn in dns:
states.append(self.get_state(dn))
return states
def get_state(self, datanode):
return self.cluster.get_container_state(self.container_id, datanode)
def wait_until_replica_is_quasi_closed(self, datanode):
def predicate():
try:
if self.cluster.get_container_state(self.container_id, datanode) == 'QUASI_CLOSED':
return True
else:
return False
except ContainerNotFoundError:
return False
util.wait_until(predicate, int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
if not predicate():
raise Exception("Replica is not quasi closed!")
def wait_until_one_replica_is_quasi_closed(self):
def predicate():
dns = self.cluster.get_container_datanodes(self.container_id)
for dn in dns:
if self.cluster.get_container_state(self.container_id, dn) == 'QUASI_CLOSED':
return True
else:
return False
util.wait_until(predicate, int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
if not predicate():
raise Exception("None of the container replica is quasi closed!")
def wait_until_replica_is_closed(self, datanode):
def predicate():
try:
if self.cluster.get_container_state(self.container_id, datanode) == 'CLOSED':
return True
else:
return False
except ContainerNotFoundError:
return False
util.wait_until(predicate, int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
if not predicate():
raise Exception("Replica is not closed!")
def wait_until_one_replica_is_closed(self):
def predicate():
dns = self.cluster.get_container_datanodes(self.container_id)
for dn in dns:
if self.cluster.get_container_state(self.container_id, dn) == 'CLOSED':
return True
else:
return False
util.wait_until(predicate, int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
if not predicate():
raise Exception("None of the container replica is closed!")
def wait_until_all_replicas_are_closed(self):
def predicate():
dns = self.cluster.get_container_datanodes(self.container_id)
for dn in dns:
if self.cluster.get_container_state(self.container_id, dn) != 'CLOSED':
return False
return True
util.wait_until(predicate, int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
if not predicate():
raise Exception("Not all the replicas are closed!")
def wait_until_replica_is_not_open_anymore(self, datanode):
def predicate():
try:
if self.cluster.get_container_state(self.container_id, datanode) != 'OPEN':
return True
else:
return False
except ContainerNotFoundError:
return False
util.wait_until(predicate, int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
if not predicate():
raise Exception("Replica is not closed!")

View File

@ -1,3 +1,5 @@
#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@ -11,4 +13,10 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# limitations under the License.
class ContainerNotFoundError(RuntimeError):
""" ContainerNotFoundError run-time error. """
def __init__(self, *args, **kwargs):
pass

View File

@ -15,38 +15,66 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import re
import logging
import subprocess
from ozone.constants import Command
logger = logging.getLogger(__name__)
def wait_until(predicate, timeout, check_frequency=1):
deadline = time.time() + timeout
while time.time() < deadline:
if predicate():
return
time.sleep(check_frequency)
deadline = time.time() + timeout
while time.time() < deadline:
if predicate():
return
time.sleep(check_frequency)
def run_cmd(cmd):
def run_docker_command(command, run_on):
if isinstance(command, list):
command = ' '.join(command)
command = [Command.docker,
"exec " + run_on,
command]
return run_command(command)
def run_command(cmd):
command = cmd
if isinstance(cmd, list):
command = ' '.join(cmd)
logger.info(" RUNNING: %s", command)
command = ' '.join(cmd)
logger.info("RUNNING: %s", command)
all_output = ""
my_process = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, shell=True)
while my_process.poll() is None:
op = my_process.stdout.readline()
if op:
all_output += op
logger.info(op)
op = my_process.stdout.readline()
if op:
all_output += op
logger.info(op)
other_output = my_process.communicate()
other_output = other_output[0].strip()
if other_output != "":
all_output += other_output
all_output += other_output
reg = re.compile(r"(\r\n|\n)$")
logger.debug("Output: %s", all_output)
all_output = reg.sub("", all_output, 1)
return my_process.returncode, all_output
def get_checksum(file_path, run_on):
command = "md5sum %s" % file_path
exit_code, output = run_docker_command(command, run_on)
assert exit_code == 0, "Cant find checksum"
output_split = output.split("\n")
result = ""
for line in output_split:
if line.find("Warning") >= 0 or line.find("is not a tty") >= 0:
logger.info("skip this line: %s", line)
else:
result = result + line
checksum = result.split(" ")
return checksum[0]

View File

@ -15,117 +15,103 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
import time
import logging
from os import environ
from blockadeUtils.blockade import Blockade
from clusterUtils.cluster_utils import ClusterUtils
import ozone.util
from ozone.cluster import OzoneCluster
logger = logging.getLogger(__name__)
if "MAVEN_TEST" in os.environ:
compose_dir = environ.get("MAVEN_TEST")
FILE = os.path.join(compose_dir, "docker-compose.yaml")
elif "OZONE_HOME" in os.environ:
compose_dir = environ.get("OZONE_HOME")
FILE = os.path.join(compose_dir, "compose", "ozoneblockade", \
"docker-compose.yaml")
else:
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
FILE = os.path.join(parent_dir, "compose", "ozoneblockade", \
"docker-compose.yaml")
os.environ["DOCKER_COMPOSE_FILE"] = FILE
SCALE = 3
CONTAINER_LIST = []
OM = []
SCM = []
DATANODES = []
CLIENT = []
def setup():
global CONTAINER_LIST, OM, SCM, DATANODES, CLIENT, ORIG_CHECKSUM, \
TEST_VOLUME_NAME, TEST_BUCKET_NAME
epoch_time = int(time.time())
TEST_VOLUME_NAME = "%s%s" % ("volume", epoch_time)
TEST_BUCKET_NAME = "%s%s" % ("bucket", epoch_time)
Blockade.blockade_destroy()
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
exit_code, output = Blockade.blockade_status()
assert exit_code == 0, "blockade status command failed with output=[%s]" % \
output
OM, SCM, CLIENT, DATANODES = \
ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST)
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
"THREE", "ozone_client")
assert exit_code == 0, "freon run failed with output=[%s]" % output
ClusterUtils.create_volume(FILE, TEST_VOLUME_NAME)
ClusterUtils.create_bucket(FILE, TEST_BUCKET_NAME, TEST_VOLUME_NAME)
ORIG_CHECKSUM = ClusterUtils.find_checksum(FILE, "/etc/passwd")
def setup_function():
global cluster
cluster = OzoneCluster.create()
cluster.start()
def teardown():
logger.info("Inside teardown")
Blockade.blockade_destroy()
def teardown_module():
ClusterUtils.cluster_destroy(FILE)
def teardown_function():
cluster.stop()
def test_client_failure_isolate_two_datanodes():
"""
In this test, all datanodes are isolated from each other.
two of the datanodes cannot communicate with any other node in the cluster.
In this test, all DNs are isolated from each other.
two of the DNs cannot communicate with any other node in the cluster.
Expectation :
Write should fail.
Keys written before parition created can be read.
Keys written before partition created should be read.
"""
test_key_name = "testkey1"
ClusterUtils.put_key(FILE, TEST_BUCKET_NAME, TEST_VOLUME_NAME,
"/etc/passwd", key_name=test_key_name,
replication_factor='THREE')
first_set = [OM[0], SCM[0], DATANODES[0], CLIENT[0]]
second_set = [DATANODES[1]]
third_set = [DATANODES[2]]
Blockade.blockade_create_partition(first_set, second_set, third_set)
Blockade.blockade_status()
exit_code, output = \
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
assert re.search(
"Status: Failed",
output) is not None
ClusterUtils.get_key(FILE, TEST_BUCKET_NAME, TEST_VOLUME_NAME,
test_key_name, "/tmp/")
key_checksum = ClusterUtils.find_checksum(FILE, "/tmp/%s" % test_key_name)
om = cluster.om
scm = cluster.scm
dns = cluster.datanodes
client = cluster.client
oz_client = cluster.get_client()
assert key_checksum == ORIG_CHECKSUM
epoch_time = int(time.time())
volume_name = "%s-%s" % ("volume", epoch_time)
bucket_name = "%s-%s" % ("bucket", epoch_time)
key_name = "key-1"
oz_client.create_volume(volume_name)
oz_client.create_bucket(volume_name, bucket_name)
oz_client.put_key("/etc/passwd", volume_name, bucket_name, key_name, "THREE")
first_set = [om, scm, dns[0], client]
second_set = [dns[1]]
third_set = [dns[2]]
logger.info("Partitioning the network")
cluster.partition_network(first_set, second_set, third_set)
exit_code, output = oz_client.run_freon(1, 1, 1, 10240)
assert re.search("Status: Failed", output) is not None
oz_client.get_key(volume_name, bucket_name, key_name, "/tmp/")
file_checksum = ozone.util.get_checksum("/etc/passwd", client)
key_checksum = ozone.util.get_checksum("/tmp/%s" % key_name, client)
assert file_checksum == key_checksum
def test_client_failure_isolate_one_datanode():
"""
In this test, one of the datanodes is isolated from all other nodes.
In this test, one of the DNs is isolated from all other nodes.
Expectation :
Write should pass.
Keys written before partition created can be read.
"""
test_key_name = "testkey2"
ClusterUtils.put_key(FILE, TEST_BUCKET_NAME, TEST_VOLUME_NAME,
"/etc/passwd", key_name=test_key_name,
replication_factor='THREE')
first_set = [OM[0], SCM[0], DATANODES[0], DATANODES[1], CLIENT[0]]
second_set = [DATANODES[2]]
Blockade.blockade_create_partition(first_set, second_set)
Blockade.blockade_status()
exit_code, output = \
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
om = cluster.om
scm = cluster.scm
dns = cluster.datanodes
client = cluster.client
oz_client = cluster.get_client()
epoch_time = int(time.time())
volume_name = "%s-%s" % ("volume", epoch_time)
bucket_name = "%s-%s" % ("bucket", epoch_time)
key_name = "key-1"
oz_client.create_volume(volume_name)
oz_client.create_bucket(volume_name, bucket_name)
oz_client.put_key("/etc/passwd", volume_name, bucket_name, key_name, "THREE")
first_set = [om, scm, dns[0], dns[1], client]
second_set = [dns[2]]
logger.info("Partitioning the network")
cluster.partition_network(first_set, second_set)
exit_code, output = oz_client.run_freon(1, 1, 1, 10240)
assert re.search("3 way commit failed", output) is not None
assert re.search("Status: Success", output) is not None
ClusterUtils.get_key(FILE, TEST_BUCKET_NAME, TEST_VOLUME_NAME,
test_key_name, "/tmp/")
key_checksum = ClusterUtils.find_checksum(FILE, "/tmp/%s" % test_key_name)
assert key_checksum == ORIG_CHECKSUM
oz_client.get_key(volume_name, bucket_name, key_name, "/tmp/")
file_checksum = ozone.util.get_checksum("/etc/passwd", client)
key_checksum = ozone.util.get_checksum("/tmp/%s" % key_name, cluster.client)
assert file_checksum == key_checksum

View File

@ -15,123 +15,133 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import logging
import util
from ozone.cluster import Cluster
import pytest
from ozone.cluster import OzoneCluster
logger = logging.getLogger(__name__)
def setup_function(function):
global cluster
cluster = Cluster.create()
cluster.start()
def setup_function():
global cluster
cluster = OzoneCluster.create()
cluster.start()
def teardown_function(function):
cluster.stop()
def teardown_function():
cluster.stop()
def test_isolate_single_datanode():
"""
In this test case we will create a network partition in such a way that
one of the datanode will not be able to communicate with other datanodes
but it will be able to communicate with SCM.
"""
In this test case we will create a network partition in such a way that
one of the DN will not be able to communicate with other datanodes
but it will be able to communicate with SCM.
Once the network partition happens, SCM detects it and closes the pipeline,
which in-turn closes the containers.
Once the network partition happens, SCM detects it and closes the pipeline,
which in-turn closes the containers.
The container on the first two datanode will get CLOSED as they have quorum.
The container replica on the third node will be QUASI_CLOSED as it is not
able to connect with the other datanodes and it doesn't have latest BCSID.
The container on the first two DN will get CLOSED as they have quorum.
The container replica on the third node will be QUASI_CLOSED as it is not
able to connect with the other DNs and it doesn't have latest BCSID.
Once we restore the network, the stale replica on the third datanode will be
deleted and a latest replica will be copied from any one of the other
datanodes.
Once we restore the network, the stale replica on the third DN will be
deleted and a latest replica will be copied from any one of the other
DNs.
"""
cluster.run_freon(1, 1, 1, 10240)
first_set = [cluster.om, cluster.scm,
cluster.datanodes[0], cluster.datanodes[1]]
second_set = [cluster.om, cluster.scm, cluster.datanodes[2]]
logger.info("Partitioning the network")
cluster.partition_network(first_set, second_set)
cluster.run_freon(1, 1, 1, 10240)
logger.info("Waiting for container to be QUASI_CLOSED")
"""
om = cluster.om
scm = cluster.scm
dns = cluster.datanodes
client = cluster.client
oz_client = cluster.get_client()
util.wait_until(lambda: cluster.get_container_states(cluster.datanodes[2])
.popitem()[1] == 'QUASI_CLOSED',
int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
container_states_dn_0 = cluster.get_container_states(cluster.datanodes[0])
container_states_dn_1 = cluster.get_container_states(cluster.datanodes[1])
container_states_dn_2 = cluster.get_container_states(cluster.datanodes[2])
assert len(container_states_dn_0) != 0
assert len(container_states_dn_1) != 0
assert len(container_states_dn_2) != 0
for key in container_states_dn_0:
assert container_states_dn_0.get(key) == 'CLOSED'
for key in container_states_dn_1:
assert container_states_dn_1.get(key) == 'CLOSED'
for key in container_states_dn_2:
assert container_states_dn_2.get(key) == 'QUASI_CLOSED'
oz_client.run_freon(1, 1, 1, 10240)
# Since the replica in datanode[2] doesn't have the latest BCSID,
# ReplicationManager will delete it and copy a closed replica.
# We will now restore the network and datanode[2] should get a
# closed replica of the container
logger.info("Restoring the network")
cluster.restore_network()
# Partition the network
first_set = [om, scm, dns[0], dns[1], client]
second_set = [om, scm, dns[2], client]
logger.info("Partitioning the network")
cluster.partition_network(first_set, second_set)
logger.info("Waiting for the replica to be CLOSED")
util.wait_until(
lambda: cluster.container_state_predicate(cluster.datanodes[2], 'CLOSED'),
int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
container_states_dn_2 = cluster.get_container_states(cluster.datanodes[2])
assert len(container_states_dn_2) != 0
for key in container_states_dn_2:
assert container_states_dn_2.get(key) == 'CLOSED'
oz_client.run_freon(1, 1, 1, 10240)
logger.info("Waiting for container to be QUASI_CLOSED")
containers = cluster.get_containers_on_datanode(dns[2])
for container in containers:
container.wait_until_replica_is_quasi_closed(dns[2])
for container in containers:
assert container.get_state(dns[0]) == 'CLOSED'
assert container.get_state(dns[1]) == 'CLOSED'
assert container.get_state(dns[2]) == 'QUASI_CLOSED'
# Since the replica in datanode[2] doesn't have the latest BCSID,
# ReplicationManager will delete it and copy a closed replica.
# We will now restore the network and datanode[2] should get a
# closed replica of the container
logger.info("Restoring the network")
cluster.restore_network()
logger.info("Waiting for the replica to be CLOSED")
for container in containers:
container.wait_until_replica_is_closed(dns[2])
for container in containers:
assert container.get_state(dns[0]) == 'CLOSED'
assert container.get_state(dns[1]) == 'CLOSED'
assert container.get_state(dns[2]) == 'CLOSED'
exit_code, output = oz_client.run_freon(1, 1, 1, 10240)
assert exit_code == 0, "freon run failed with output=[%s]" % output
@pytest.mark.skip(reason="RATIS-615")
def test_datanode_isolation_all():
"""
In this test case we will create a network partition in such a way that
all datanodes cannot communicate with each other.
All datanodes will be able to communicate with SCM.
"""
In this test case we will create a network partition in such a way that
all DNs cannot communicate with each other.
All DNs will be able to communicate with SCM.
Once the network partition happens, SCM detects it and closes the pipeline,
which in-turn tries to close the containers.
At least one of the replica should be in closed state
Once the network partition happens, SCM detects it and closes the pipeline,
which in-turn tries to close the containers.
At least one of the replica should be in closed state
Once we restore the network, there will be three closed replicas.
Once we restore the network, there will be three closed replicas.
"""
cluster.run_freon(1, 1, 1, 10240)
"""
om = cluster.om
scm = cluster.scm
dns = cluster.datanodes
client = cluster.client
oz_client = cluster.get_client()
assert len(cluster.get_container_states(cluster.datanodes[0])) != 0
assert len(cluster.get_container_states(cluster.datanodes[1])) != 0
assert len(cluster.get_container_states(cluster.datanodes[2])) != 0
oz_client.run_freon(1, 1, 1, 10240)
logger.info("Partitioning the network")
first_set = [cluster.om, cluster.scm, cluster.datanodes[0]]
second_set = [cluster.om, cluster.scm, cluster.datanodes[1]]
third_set = [cluster.om, cluster.scm, cluster.datanodes[2]]
cluster.partition_network(first_set, second_set, third_set)
logger.info("Partitioning the network")
first_set = [om, scm, dns[0], client]
second_set = [om, scm, dns[1], client]
third_set = [om, scm, dns[2], client]
cluster.partition_network(first_set, second_set, third_set)
logger.info("Waiting for the replica to be CLOSED")
util.wait_until(
lambda: cluster.container_state_predicate_one_closed(cluster.datanodes),
int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
containers = cluster.get_containers_on_datanode(dns[0])
container = containers.pop()
# At least one of the replica should be in closed state
assert cluster.container_state_predicate_one_closed(cluster.datanodes)
logger.info("Waiting for a replica to be CLOSED")
container.wait_until_one_replica_is_closed()
# After restoring the network all the replicas should be in
# CLOSED state
logger.info("Restoring the network")
cluster.restore_network()
# At least one of the replica should be in closed state
assert 'CLOSED' in container.get_datanode_states()
logger.info("Waiting for the container to be replicated")
util.wait_until(
lambda: cluster.container_state_predicate_all_closed(cluster.datanodes),
int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
assert cluster.container_state_predicate_all_closed(cluster.datanodes)
logger.info("Restoring the network")
cluster.restore_network()
logger.info("Waiting for the container to be replicated")
container.wait_until_all_replicas_are_closed()
# After restoring the network all the replicas should be in CLOSED state
for state in container.get_datanode_states():
assert state == 'CLOSED'
exit_code, output = oz_client.run_freon(1, 1, 1, 10240)
assert exit_code == 0, "freon run failed with output=[%s]" % output

View File

@ -15,50 +15,33 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import logging
import random
import pytest
from os import environ
from blockadeUtils.blockade import Blockade
from ozone.cluster import Cluster
from ozone.blockade import Blockade
from ozone.cluster import OzoneCluster
logger = logging.getLogger(__name__)
if "MAVEN_TEST" in os.environ:
compose_dir = environ.get("MAVEN_TEST")
FILE = os.path.join(compose_dir, "docker-compose.yaml")
elif "OZONE_HOME" in os.environ:
compose_dir = environ.get("OZONE_HOME")
FILE = os.path.join(compose_dir, "compose", "ozoneblockade", \
"docker-compose.yaml")
else:
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
FILE = os.path.join(parent_dir, "compose", "ozoneblockade", \
"docker-compose.yaml")
os.environ["DOCKER_COMPOSE_FILE"] = FILE
SCALE = 6
CONTAINER_LIST = []
def setup_function(function):
global cluster
cluster = Cluster.create()
cluster.start()
def setup_function():
global cluster
cluster = OzoneCluster.create()
cluster.start()
def teardown_function(function):
cluster.stop()
def teardown_function():
cluster.stop()
@pytest.mark.parametrize("flaky_node", ["datanode", "scm", "om", "all"])
@pytest.mark.parametrize("flaky_node", ["datanode", "scm", "om"])
def test_flaky(flaky_node):
"""
In these tests, we make the network of the nodes as flaky using blockade.
There are 4 tests :
1) one of the datanodes selected randomly and network of the datanode is
made flaky.
1) one of the DNs selected randomly and network of the DN is made flaky.
2) scm network is made flaky.
3) om network is made flaky.
4) Network of all the nodes are made flaky.
@ -72,6 +55,5 @@ def test_flaky(flaky_node):
}[flaky_node]
Blockade.make_flaky(flaky_container_name)
Blockade.blockade_status()
exit_code, output = cluster.run_freon(1, 1, 1, 10240)
exit_code, output = cluster.get_client().run_freon(1, 1, 1, 10240)
assert exit_code == 0, "freon run failed with output=[%s]" % output

View File

@ -15,145 +15,107 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import logging
import re
from os import environ
from blockadeUtils.blockade import Blockade
from clusterUtils.cluster_utils import ClusterUtils
from ozone.cluster import OzoneCluster
logger = logging.getLogger(__name__)
if "MAVEN_TEST" in os.environ:
compose_dir = environ.get("MAVEN_TEST")
FILE = os.path.join(compose_dir, "docker-compose.yaml")
elif "OZONE_HOME" in os.environ:
compose_dir = environ.get("OZONE_HOME")
FILE = os.path.join(compose_dir, "compose", "ozoneblockade", \
"docker-compose.yaml")
else:
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
FILE = os.path.join(parent_dir, "compose", "ozoneblockade", \
"docker-compose.yaml")
os.environ["DOCKER_COMPOSE_FILE"] = FILE
SCALE = 3
INCREASED_SCALE = 5
CONTAINER_LIST = []
OM = []
SCM = []
DATANODES = []
def setup():
global CONTAINER_LIST, OM, SCM, DATANODES
Blockade.blockade_destroy()
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
exit_code, output = Blockade.blockade_status()
assert exit_code == 0, "blockade status command failed with output=[%s]" % \
output
OM, SCM, _, DATANODES = \
ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST)
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
"THREE")
assert exit_code == 0, "freon run failed with output=[%s]" % output
def setup_function():
global cluster
cluster = OzoneCluster.create()
cluster.start()
def teardown():
logger.info("Inside teardown")
Blockade.blockade_destroy()
def teardown_function():
cluster.stop()
def teardown_module():
ClusterUtils.cluster_destroy(FILE)
def test_one_dn_isolate_scm_other_dn():
"""
In this test, one of the DNs cannot communicate with SCM and other DNs.
Other DNs can communicate with each other and SCM .
Expectation : The container should eventually have two closed replicas.
"""
om = cluster.om
scm = cluster.scm
dns = cluster.datanodes
client = cluster.client
oz_client = cluster.get_client()
oz_client.run_freon(1, 1, 1, 10240)
# Partition the network
first_set = [dns[0], client]
second_set = [scm, om, dns[1], dns[2], client]
cluster.partition_network(first_set, second_set)
oz_client.run_freon(1, 1, 1, 10240)
containers = cluster.get_containers_on_datanode(dns[1])
for container in containers:
container.wait_until_one_replica_is_closed()
for container in containers:
assert container.get_state(dns[0]) == 'OPEN'
assert container.get_state(dns[1]) == 'CLOSED'
assert container.get_state(dns[2]) == 'CLOSED'
cluster.restore_network()
for container in containers:
container.wait_until_all_replicas_are_closed()
for container in containers:
assert container.get_state(dns[0]) == 'CLOSED'
assert container.get_state(dns[1]) == 'CLOSED'
assert container.get_state(dns[2]) == 'CLOSED'
exit_code, output = oz_client.run_freon(1, 1, 1, 10240)
assert exit_code == 0, "freon run failed with output=[%s]" % output
def test_one_dn_isolate_scm_other_dn(run_second_phase):
"""
In this test, one of the datanodes cannot communicate with SCM and other
datanodes.
Other datanodes can communicate with each other and SCM .
Expectation : The container should eventually have two closed replicas.
"""
first_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]]
second_set = [OM[0], DATANODES[0]]
Blockade.blockade_create_partition(first_set, second_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, SCALE)
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED',
all_datanodes_container_status)
assert len(count_closed_container_datanodes) == 2, \
"The container should have two closed replicas."
if str(run_second_phase).lower() == "true":
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
Blockade.blockade_status()
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(
FILE, INCREASED_SCALE)
count_closed_container_datanodes = filter(
lambda x: x == 'CLOSED', all_datanodes_container_status)
assert len(count_closed_container_datanodes) >= 3, \
"The container should have at least three closed replicas."
_, output = \
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
assert re.search("Status: Success", output) is not None
def test_one_dn_isolate_other_dn():
"""
In this test, one of the DNs (first DN) cannot communicate
other DNs but can communicate with SCM.
One of the other two DNs (second DN) cannot communicate with SCM.
Expectation :
The container replica state in first DN can be either closed or
quasi-closed.
The container replica state in second DN can be either closed or open.
The container should eventually have at lease one closed replica.
"""
om = cluster.om
scm = cluster.scm
dns = cluster.datanodes
client = cluster.client
oz_client = cluster.get_client()
oz_client.run_freon(1, 1, 1, 10240)
def test_one_dn_isolate_other_dn(run_second_phase):
"""
In this test, one of the datanodes (first datanode) cannot communicate
other datanodes but can communicate with SCM.
One of the other two datanodes (second datanode) cannot communicate with
SCM.
Expectation :
The container replica state in first datanode can be either closed or
quasi-closed.
The container replica state in second datanode can be either closed or open.
The container should eventually have at lease one closed replica.
"""
first_set = [OM[0], SCM[0], DATANODES[0]]
second_set = [OM[0], DATANODES[1], DATANODES[2]]
third_set = [SCM[0], DATANODES[2]]
Blockade.blockade_create_partition(first_set, second_set, third_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, SCALE)
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED',
all_datanodes_container_status)
first_datanode_status = all_datanodes_container_status[0]
second_datanode_status = all_datanodes_container_status[1]
assert first_datanode_status == 'CLOSED' or \
first_datanode_status == "QUASI_CLOSED"
assert second_datanode_status == 'CLOSED' or \
second_datanode_status == "OPEN"
assert len(count_closed_container_datanodes) >= 1, \
"The container should have at least one closed replica"
if str(run_second_phase).lower() == "true":
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
Blockade.blockade_status()
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(
FILE, INCREASED_SCALE)
count_closed_container_datanodes = filter(
lambda x: x == 'CLOSED', all_datanodes_container_status)
assert len(count_closed_container_datanodes) >= 3, \
"The container should have at least three closed replicas."
_, output = \
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
assert re.search("Status: Success", output) is not None
# Partition the network
first_set = [om, scm, dns[0], client]
second_set = [om, dns[1], dns[2], client]
third_set = [scm, dns[2], client]
cluster.partition_network(first_set, second_set, third_set)
oz_client.run_freon(1, 1, 1, 10240)
containers = cluster.get_containers_on_datanode(dns[0])
for container in containers:
container.wait_until_replica_is_quasi_closed(dns[0])
for container in containers:
assert container.get_state(dns[0]) == 'QUASI_CLOSED'
assert container.get_state(dns[1]) == 'OPEN' or \
container.get_state(dns[1]) == 'CLOSED'
assert container.get_state(dns[2]) == 'QUASI_CLOSED' or \
container.get_state(dns[2]) == 'CLOSED'
cluster.restore_network()
for container in containers:
container.wait_until_all_replicas_are_closed()
for container in containers:
assert container.get_state(dns[0]) == 'CLOSED'
assert container.get_state(dns[1]) == 'CLOSED'
assert container.get_state(dns[2]) == 'CLOSED'
exit_code, output = oz_client.run_freon(1, 1, 1, 10240)
assert exit_code == 0, "freon run failed with output=[%s]" % output

View File

@ -15,221 +15,150 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import logging
import re
from os import environ
from blockadeUtils.blockade import Blockade
from clusterUtils.cluster_utils import ClusterUtils
from ozone.cluster import OzoneCluster
logger = logging.getLogger(__name__)
if "MAVEN_TEST" in os.environ:
compose_dir = environ.get("MAVEN_TEST")
FILE = os.path.join(compose_dir, "docker-compose.yaml")
elif "OZONE_HOME" in os.environ:
compose_dir = environ.get("OZONE_HOME")
FILE = os.path.join(compose_dir, "compose", "ozoneblockade", \
"docker-compose.yaml")
else:
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
FILE = os.path.join(parent_dir, "compose", "ozoneblockade", \
"docker-compose.yaml")
os.environ["DOCKER_COMPOSE_FILE"] = FILE
SCALE = 3
INCREASED_SCALE = 5
CONTAINER_LIST = []
OM = []
SCM = []
DATANODES = []
def setup():
global CONTAINER_LIST, OM, SCM, DATANODES
Blockade.blockade_destroy()
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
exit_code, output = Blockade.blockade_status()
assert exit_code == 0, "blockade status command failed with output=[%s]" % \
output
OM, SCM, _, DATANODES = \
ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST)
exit_code, output = \
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
assert exit_code == 0, "freon run failed with output=[%s]" % output
def setup_function():
global cluster
cluster = OzoneCluster.create()
cluster.start()
def teardown():
logger.info("Inside teardown")
Blockade.blockade_destroy()
def teardown_function():
cluster.stop()
def teardown_module():
ClusterUtils.cluster_destroy(FILE)
def test_three_dns_isolate_one_scm_failure():
"""
In this test, all DNs are isolated from each other.
One of the DNs (third DN) cannot communicate with SCM.
Expectation :
The container replica state in first DN should be closed.
The container replica state in second DN should be closed.
The container replica state in third DN should be open.
"""
om = cluster.om
scm = cluster.scm
dns = cluster.datanodes
client = cluster.client
oz_client = cluster.get_client()
oz_client.run_freon(1, 1, 1, 10240)
first_set = [om, scm, dns[0], client]
second_set = [om, scm, dns[1], client]
third_set = [om, dns[2], client]
cluster.partition_network(first_set, second_set, third_set)
containers = cluster.get_containers_on_datanode(dns[0])
for container in containers:
container.wait_until_replica_is_closed(dns[0])
for container in containers:
assert container.get_state(dns[0]) == 'CLOSED'
assert container.get_state(dns[1]) == 'CLOSED'
assert container.get_state(dns[2]) == 'OPEN'
cluster.restore_network()
for container in containers:
container.wait_until_all_replicas_are_closed()
for container in containers:
assert container.get_state(dns[0]) == 'CLOSED'
assert container.get_state(dns[1]) == 'CLOSED'
assert container.get_state(dns[2]) == 'CLOSED'
exit_code, output = oz_client.run_freon(1, 1, 1, 10240)
assert exit_code == 0, "freon run failed with output=[%s]" % output
def test_three_dns_isolate_onescmfailure(run_second_phase):
"""
In this test, all datanodes are isolated from each other.
One of the datanodes (third datanode) cannot communicate with SCM.
Expectation :
The container replica state in first datanode should be closed.
The container replica state in second datanode should be closed.
The container replica state in third datanode should be open.
"""
first_set = [OM[0], SCM[0], DATANODES[0]]
second_set = [OM[0], SCM[0], DATANODES[1]]
third_set = [OM[0], DATANODES[2]]
Blockade.blockade_create_partition(first_set, second_set, third_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, SCALE)
first_datanode_status = all_datanodes_container_status[0]
second_datanode_status = all_datanodes_container_status[1]
third_datanode_status = all_datanodes_container_status[2]
assert first_datanode_status == 'CLOSED'
assert second_datanode_status == 'CLOSED'
assert third_datanode_status == 'OPEN'
def test_three_dns_isolate_two_scm_failure():
"""
In this test, all DNs are isolated from each other.
two DNs cannot communicate with SCM (second DN and third DN)
Expectation :
The container replica state in first DN should be quasi-closed.
The container replica state in second DN should be open.
The container replica state in third DN should be open.
"""
om = cluster.om
scm = cluster.scm
dns = cluster.datanodes
client = cluster.client
oz_client = cluster.get_client()
if str(run_second_phase).lower() == "true":
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
Blockade.blockade_status()
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(
FILE, INCREASED_SCALE)
count_closed_container_datanodes = filter(
lambda x: x == 'CLOSED', all_datanodes_container_status)
assert len(count_closed_container_datanodes) == 3, \
"The container should have three closed replicas."
Blockade.blockade_join()
Blockade.blockade_status()
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(
FILE, INCREASED_SCALE)
count_closed_container_datanodes = filter(
lambda x: x == 'CLOSED', all_datanodes_container_status)
assert len(count_closed_container_datanodes) == 3, \
"The container should have three closed replicas."
oz_client.run_freon(1, 1, 1, 10240)
first_set = [om, scm, dns[0], client]
second_set = [om, dns[1], client]
third_set = [om, dns[2], client]
cluster.partition_network(first_set, second_set, third_set)
containers = cluster.get_containers_on_datanode(dns[0])
for container in containers:
container.wait_until_replica_is_closed(dns[0])
for container in containers:
assert container.get_state(dns[0]) == 'QUASI_CLOSED'
assert container.get_state(dns[1]) == 'OPEN'
assert container.get_state(dns[2]) == 'OPEN'
cluster.restore_network()
for container in containers:
container.wait_until_all_replicas_are_closed()
for container in containers:
assert container.get_state(dns[0]) == 'CLOSED'
assert container.get_state(dns[1]) == 'CLOSED'
assert container.get_state(dns[2]) == 'CLOSED'
exit_code, output = oz_client.run_freon(1, 1, 1, 10240)
assert exit_code == 0, "freon run failed with output=[%s]" % output
def test_three_dns_isolate_twoscmfailure(run_second_phase):
"""
In this test, all datanodes are isolated from each other.
two datanodes cannot communicate with SCM (second datanode and third
datanode)
Expectation :
The container replica state in first datanode should be quasi-closed.
The container replica state in second datanode should be open.
The container replica state in third datanode should be open.
"""
first_set = [OM[0], SCM[0], DATANODES[0]]
second_set = [OM[0], DATANODES[1]]
third_set = [OM[0], DATANODES[2]]
Blockade.blockade_create_partition(first_set, second_set, third_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, SCALE)
first_datanode_status = all_datanodes_container_status[0]
second_datanode_status = all_datanodes_container_status[1]
third_datanode_status = all_datanodes_container_status[2]
assert first_datanode_status == 'QUASI_CLOSED'
assert second_datanode_status == 'OPEN'
assert third_datanode_status == 'OPEN'
def test_three_dns_isolate_three_scm_failure():
"""
In this test, all DNs are isolated from each other and also cannot
communicate with SCM.
Expectation :
The container replica state in first DN should be open.
The container replica state in second DN should be open.
The container replica state in third DN should be open.
"""
om = cluster.om
dns = cluster.datanodes
client = cluster.client
oz_client = cluster.get_client()
if str(run_second_phase).lower() == "true":
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
Blockade.blockade_status()
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(
FILE, INCREASED_SCALE)
count_quasi_closed_container_datanodes = filter(
lambda x: x == 'QUASI_CLOSED', all_datanodes_container_status)
assert len(count_quasi_closed_container_datanodes) >= 3, \
"The container should have at least three quasi-closed replicas."
Blockade.blockade_join()
Blockade.blockade_status()
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(
FILE, INCREASED_SCALE)
count_closed_container_datanodes = filter(
lambda x: x == 'CLOSED', all_datanodes_container_status)
assert len(count_closed_container_datanodes) == 3, \
"The container should have three closed replicas."
oz_client.run_freon(1, 1, 1, 10240)
first_set = [om, dns[0], client]
second_set = [om, dns[1], client]
third_set = [om, dns[2], client]
def test_three_dns_isolate_threescmfailure(run_second_phase):
"""
In this test, all datanodes are isolated from each other and also cannot
communicate with SCM.
Expectation :
The container replica state in first datanode should be open.
The container replica state in second datanode should be open.
The container replica state in third datanode should be open.
"""
first_set = [OM[0], DATANODES[0]]
second_set = [OM[0], DATANODES[1]]
third_set = [OM[0], DATANODES[2]]
Blockade.blockade_create_partition(first_set, second_set, third_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, SCALE)
first_datanode_status = all_datanodes_container_status[0]
second_datanode_status = all_datanodes_container_status[1]
third_datanode_status = all_datanodes_container_status[2]
assert first_datanode_status == 'OPEN'
assert second_datanode_status == 'OPEN'
assert third_datanode_status == 'OPEN'
cluster.partition_network(first_set, second_set, third_set)
if str(run_second_phase).lower() == "true":
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
Blockade.blockade_status()
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
output = ClusterUtils.get_pipelines(FILE)
if output:
assert re.search("Factor:THREE", output) is None
all_datanodes_container_status = \
ClusterUtils.findall_container_status(
FILE, INCREASED_SCALE)
datanodes_having_container_status = filter(
lambda x: x != 'None', all_datanodes_container_status)
assert len(datanodes_having_container_status) == 3, \
"Containers should not be replicated on addition of new nodes."
Blockade.blockade_join()
Blockade.blockade_status()
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(
FILE, INCREASED_SCALE)
count_closed_container_datanodes = filter(
lambda x: x == 'CLOSED', all_datanodes_container_status)
assert len(count_closed_container_datanodes) == 3, \
"The container should have three closed replicas."
# Wait till the datanodes are marked as stale by SCM
time.sleep(150)
containers = cluster.get_containers_on_datanode(dns[0])
for container in containers:
assert container.get_state(dns[0]) == 'OPEN'
assert container.get_state(dns[1]) == 'OPEN'
assert container.get_state(dns[2]) == 'OPEN'
cluster.restore_network()
for container in containers:
container.wait_until_all_replicas_are_closed()
for container in containers:
assert container.get_state(dns[0]) == 'CLOSED'
assert container.get_state(dns[1]) == 'CLOSED'
assert container.get_state(dns[2]) == 'CLOSED'
exit_code, output = oz_client.run_freon(1, 1, 1, 10240)
assert exit_code == 0, "freon run failed with output=[%s]" % output

View File

@ -15,169 +15,118 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import logging
import re
from os import environ
from blockadeUtils.blockade import Blockade
from clusterUtils.cluster_utils import ClusterUtils
from ozone.cluster import OzoneCluster
logger = logging.getLogger(__name__)
if "MAVEN_TEST" in os.environ:
compose_dir = environ.get("MAVEN_TEST")
FILE = os.path.join(compose_dir, "docker-compose.yaml")
elif "OZONE_HOME" in os.environ:
compose_dir = environ.get("OZONE_HOME")
FILE = os.path.join(compose_dir, "compose", "ozoneblockade", \
"docker-compose.yaml")
else:
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
FILE = os.path.join(parent_dir, "compose", "ozoneblockade", \
"docker-compose.yaml")
os.environ["DOCKER_COMPOSE_FILE"] = FILE
SCALE = 3
INCREASED_SCALE = 5
CONTAINER_LIST = []
OM = []
SCM = []
DATANODES = []
def setup():
global CONTAINER_LIST, OM, SCM, DATANODES
Blockade.blockade_destroy()
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
exit_code, output = Blockade.blockade_status()
assert exit_code == 0, "blockade status command failed with output=[%s]" % \
output
OM, SCM, _, DATANODES = \
ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST)
exit_code, output = \
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
assert exit_code == 0, "freon run failed with output=[%s]" % output
def setup_function():
global cluster
cluster = OzoneCluster.create()
cluster.start()
def teardown():
logger.info("Inside teardown")
Blockade.blockade_destroy()
def teardown_function():
cluster.stop()
def teardown_module():
ClusterUtils.cluster_destroy(FILE)
def test_two_dns_isolate_scm_same_partition():
"""
In this test, there are three DNs,
DN1 is on a network partition and
DN2, DN3 are on a different network partition.
DN2 and DN3 cannot communicate with SCM.
Expectation :
The container replica state in DN1 should be quasi-closed.
The container replica state in DN2 should be open.
The container replica state in DN3 should be open.
"""
om = cluster.om
scm = cluster.scm
dns = cluster.datanodes
client = cluster.client
oz_client = cluster.get_client()
oz_client.run_freon(1, 1, 1, 10240)
first_set = [om, dns[1], dns[2], client]
second_set = [om, scm, dns[0], client]
cluster.partition_network(first_set, second_set)
oz_client.run_freon(1, 1, 1, 10240)
containers = cluster.get_containers_on_datanode(dns[0])
for container in containers:
container.wait_until_one_replica_is_quasi_closed()
for container in containers:
assert container.get_state(dns[0]) == 'QUASI_CLOSED'
assert container.get_state(dns[1]) == 'OPEN'
assert container.get_state(dns[2]) == 'OPEN'
cluster.restore_network()
for container in containers:
container.wait_until_all_replicas_are_closed()
for container in containers:
assert container.get_state(dns[0]) == 'CLOSED'
assert container.get_state(dns[1]) == 'CLOSED'
assert container.get_state(dns[2]) == 'CLOSED'
exit_code, output = oz_client.run_freon(1, 1, 1, 10240)
assert exit_code == 0, "freon run failed with output=[%s]" % output
def test_two_dns_isolate_scm_same_partition(run_second_phase):
"""
In this test, there are three datanodes, DN1, DN2, DN3
DN1 is on a network partition and
DN2, DN3 are on a different network partition.
DN2 and DN3 cannot communicate with SCM.
Expectation :
The container replica state in DN1 should be quasi-closed.
The container replica state in DN2 should be open.
The container replica state in DN3 should be open.
"""
first_set = [OM[0], DATANODES[1], DATANODES[2]]
second_set = [OM[0], SCM[0], DATANODES[0]]
Blockade.blockade_create_partition(first_set, second_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, SCALE)
first_datanode_status = all_datanodes_container_status[0]
second_datanode_status = all_datanodes_container_status[1]
third_datanode_status = all_datanodes_container_status[2]
assert first_datanode_status == 'QUASI_CLOSED'
assert second_datanode_status == 'OPEN'
assert third_datanode_status == 'OPEN'
def test_two_dns_isolate_scm_different_partition():
"""
In this test, there are three DNs,
DN1 is on a network partition and
DN2, DN3 are on a different network partition.
DN1 and DN2 cannot communicate with SCM.
Expectation :
The container replica state in DN1 should be open.
The container replica states can be either 'closed'
in DN2 and DN3 or 'open' in DN2 and 'quasi-closed' in DN3.
"""
if str(run_second_phase).lower() == "true":
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
Blockade.blockade_status()
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(
FILE, INCREASED_SCALE)
count_quasi_closed_container_datanodes = filter(
lambda x: x == 'QUASI_CLOSED', all_datanodes_container_status)
assert len(count_quasi_closed_container_datanodes) >= 3, \
"The container should have at least three quasi-closed replicas."
Blockade.blockade_join()
Blockade.blockade_status()
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(
FILE, INCREASED_SCALE)
count_closed_container_datanodes = filter(
lambda x: x == 'CLOSED', all_datanodes_container_status)
assert len(count_closed_container_datanodes) >= 3
om = cluster.om
scm = cluster.scm
dns = cluster.datanodes
client = cluster.client
oz_client = cluster.get_client()
oz_client.run_freon(1, 1, 1, 10240)
def test_two_dns_isolate_scm_different_partition(run_second_phase):
"""
In this test, there are three datanodes, DN1, DN2, DN3
DN1 is on a network partition and
DN2, DN3 are on a different network partition.
DN1 and DN2 cannot communicate with SCM.
Expectation :
The container replica state in datanode DN1 should be open.
The container replica states can be either 'closed'
in DN2 and DN3, or,
'open' in DN2 and 'quasi-closed' in DN3.
"""
first_set = [OM[0], DATANODES[0]]
second_set = [OM[0], DATANODES[1], DATANODES[2]]
third_set = [SCM[0], DATANODES[2]]
Blockade.blockade_create_partition(first_set, second_set, third_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, SCALE)
first_datanode_status = all_datanodes_container_status[0]
second_datanode_status = all_datanodes_container_status[1]
third_datanode_status = all_datanodes_container_status[2]
assert first_datanode_status == 'OPEN'
assert (second_datanode_status == 'CLOSED' and
third_datanode_status == 'CLOSED') or \
(second_datanode_status == 'OPEN' and
third_datanode_status == 'QUASI_CLOSED')
first_set = [om, dns[0], client]
second_set = [om, dns[1], dns[2], client]
third_set = [scm, dns[2], client]
cluster.partition_network(first_set, second_set, third_set)
oz_client.run_freon(1, 1, 1, 10240)
if str(run_second_phase).lower() == "true":
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
Blockade.blockade_status()
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(
FILE, INCREASED_SCALE)
count_closed_container_datanodes = filter(
lambda x: x == 'CLOSED', all_datanodes_container_status)
count_qausi_closed_container_datanodes = filter(
lambda x: x == 'QUASI_CLOSED', all_datanodes_container_status)
assert len(count_closed_container_datanodes) >= 3 or \
len(count_qausi_closed_container_datanodes) >= 3
Blockade.blockade_join()
Blockade.blockade_status()
if len(count_closed_container_datanodes) < 3:
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(
FILE, INCREASED_SCALE)
count_closed_container_datanodes = filter(
lambda x: x == 'CLOSED', all_datanodes_container_status)
assert len(count_closed_container_datanodes) >= 3
_, output = \
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
assert re.search("Status: Success", output) is not None
containers = cluster.get_containers_on_datanode(dns[2])
for container in containers:
container.wait_until_replica_is_not_open_anymore(dns[2])
for container in containers:
assert container.get_state(dns[0]) == 'OPEN'
assert (container.get_state(dns[1]) == 'CLOSED' and
container.get_state(dns[2]) == 'CLOSED') or \
(container.get_state(dns[1]) == 'OPEN' and
container.get_state(dns[2]) == 'QUASI_CLOSED')
cluster.restore_network()
for container in containers:
container.wait_until_all_replicas_are_closed()
for container in containers:
assert container.get_state(dns[0]) == 'CLOSED'
assert container.get_state(dns[1]) == 'CLOSED'
assert container.get_state(dns[2]) == 'CLOSED'
exit_code, output = oz_client.run_freon(1, 1, 1, 10240)
assert exit_code == 0, "freon run failed with output=[%s]" % output

View File

@ -15,153 +15,111 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import re
import logging
from os import environ
from blockadeUtils.blockade import Blockade
from clusterUtils.cluster_utils import ClusterUtils
from ozone.cluster import OzoneCluster
logger = logging.getLogger(__name__)
if "MAVEN_TEST" in os.environ:
compose_dir = environ.get("MAVEN_TEST")
FILE = os.path.join(compose_dir, "docker-compose.yaml")
elif "OZONE_HOME" in os.environ:
compose_dir = environ.get("OZONE_HOME")
FILE = os.path.join(compose_dir, "compose", "ozoneblockade", \
"docker-compose.yaml")
else:
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
FILE = os.path.join(parent_dir, "compose", "ozoneblockade", \
"docker-compose.yaml")
os.environ["DOCKER_COMPOSE_FILE"] = FILE
SCALE = 3
INCREASED_SCALE = 5
CONTAINER_LIST = []
OM = []
SCM = []
DATANODES = []
def setup():
global CONTAINER_LIST, OM, SCM, DATANODES
Blockade.blockade_destroy()
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
exit_code, output = Blockade.blockade_status()
assert exit_code == 0, "blockade status command failed with output=[%s]" % \
output
OM, SCM, _, DATANODES = \
ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST)
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
"THREE")
assert exit_code == 0, "freon run failed with output=[%s]" % output
def setup_function():
global cluster
cluster = OzoneCluster.create()
cluster.start()
def teardown():
logger.info("Inside teardown")
Blockade.blockade_destroy()
def teardown_function():
cluster.stop()
def teardown_module():
ClusterUtils.cluster_destroy(FILE)
def test_scm_isolation_one_node():
"""
In this test, one of the DNs cannot communicate with SCM.
Other DNs can communicate with SCM.
Expectation : The container should eventually have at least two closed
replicas.
"""
om = cluster.om
scm = cluster.scm
dns = cluster.datanodes
client = cluster.client
oz_client = cluster.get_client()
oz_client.run_freon(1, 1, 1, 10240)
first_set = [om, dns[0], dns[1], dns[2], client]
second_set = [om, scm, dns[1], dns[2], client]
cluster.partition_network(first_set, second_set)
oz_client.run_freon(1, 1, 1, 10240)
containers = cluster.get_containers_on_datanode(dns[1])
for container in containers:
container.wait_until_one_replica_is_closed()
for container in containers:
assert container.get_state(dns[0]) == 'OPEN'
assert container.get_state(dns[1]) == 'CLOSED'
assert container.get_state(dns[2]) == 'CLOSED'
cluster.restore_network()
for container in containers:
container.wait_until_all_replicas_are_closed()
for container in containers:
assert container.get_state(dns[0]) == 'CLOSED'
assert container.get_state(dns[1]) == 'CLOSED'
assert container.get_state(dns[2]) == 'CLOSED'
exit_code, output = oz_client.run_freon(1, 1, 1, 10240)
assert exit_code == 0, "freon run failed with output=[%s]" % output
def test_scm_isolation_one_node(run_second_phase):
"""
In this test, one of the datanodes cannot communicate with SCM.
Other datanodes can communicate with SCM.
Expectation : The container should eventually have at least two closed
replicas.
"""
first_set = [OM[0], DATANODES[0], DATANODES[1], DATANODES[2]]
second_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]]
Blockade.blockade_create_partition(first_set, second_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, SCALE)
closed_container_datanodes = [x for x in all_datanodes_container_status
if x == 'CLOSED']
assert len(closed_container_datanodes) >= 2, \
"The container should have at least two closed replicas."
def test_scm_isolation_two_node():
"""
In this test, two DNs cannot communicate with SCM.
Expectation : The container should eventually have at three closed replicas
or, two open replicas and one quasi-closed replica.
"""
om = cluster.om
scm = cluster.scm
dns = cluster.datanodes
client = cluster.client
oz_client = cluster.get_client()
if str(run_second_phase).lower() == "true":
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
Blockade.blockade_status()
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, INCREASED_SCALE)
closed_container_datanodes = [x for x in all_datanodes_container_status
if x == 'CLOSED']
assert len(closed_container_datanodes) >= 3, \
"The container should have at least three closed replicas."
Blockade.blockade_join()
Blockade.blockade_status()
_, output = \
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
assert re.search("Status: Success", output) is not None
oz_client.run_freon(1, 1, 1, 10240)
first_set = [om, dns[0], dns[1], dns[2], client]
second_set = [om, scm, dns[1], client]
cluster.partition_network(first_set, second_set)
oz_client.run_freon(1, 1, 1, 10240)
def test_scm_isolation_two_node(run_second_phase):
"""
In this test, two datanodes cannot communicate with SCM.
Expectation : The container should eventually have at three closed replicas
or, two open replicas and one quasi-closed replica.
"""
first_set = [OM[0], DATANODES[0], DATANODES[1], DATANODES[2]]
second_set = [OM[0], SCM[0], DATANODES[1]]
Blockade.blockade_create_partition(first_set, second_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, SCALE)
closed_container_datanodes = [x for x in all_datanodes_container_status
if x == 'CLOSED']
qausiclosed_container_datanodes = [x for x in all_datanodes_container_status
if x == 'QUASI_CLOSED']
count_open_container_datanodes = [x for x in all_datanodes_container_status
if x == 'OPEN']
assert len(closed_container_datanodes) == 3 or \
(len(count_open_container_datanodes) == 2 and
len(qausiclosed_container_datanodes) == 1), \
"The container should have three closed replicas or two open " \
"replicas and one quasi_closed replica."
containers = cluster.get_containers_on_datanode(dns[1])
if str(run_second_phase).lower() == "true":
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
Blockade.blockade_status()
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, INCREASED_SCALE)
closed_container_datanodes = [x for x in all_datanodes_container_status
if x == 'CLOSED']
qausiclosed_container_datanodes = \
[x for x in all_datanodes_container_status if x == 'QUASI_CLOSED']
assert len(closed_container_datanodes) >= 3 or \
len(qausiclosed_container_datanodes) >= 3
Blockade.blockade_join()
Blockade.blockade_status()
if len(closed_container_datanodes) < 3:
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.findall_container_status(FILE, INCREASED_SCALE)
closed_container_datanodes = [x for x in all_datanodes_container_status
if x == 'CLOSED']
for container in containers:
container.wait_until_replica_is_not_open_anymore(dns[1])
assert len(closed_container_datanodes) >= 3
_, output = \
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
assert re.search("Status: Success", output) is not None
for container in containers:
state = container.get_state(dns[1])
assert state == 'QUASI_CLOSED' or state == 'CLOSED'
if state == 'QUASI_CLOSED':
assert container.get_state(dns[0]) == 'OPEN'
assert container.get_state(dns[2]) == 'OPEN'
else :
assert container.get_state(dns[0]) == 'CLOSED'
assert container.get_state(dns[2]) == 'CLOSED'
cluster.restore_network()
for container in containers:
container.wait_until_all_replicas_are_closed()
for container in containers:
assert container.get_state(dns[0]) == 'CLOSED'
assert container.get_state(dns[1]) == 'CLOSED'
assert container.get_state(dns[2]) == 'CLOSED'
exit_code, output = oz_client.run_freon(1, 1, 1, 10240)
assert exit_code == 0, "freon run failed with output=[%s]" % output