HDDS-997. Add blockade Tests for scm isolation and mixed node isolation. Contributed by Nilotpal Nandi.

This commit is contained in:
Mukul Kumar Singh 2019-02-01 13:10:08 +05:30
parent 16195eaee1
commit 13aa939224
10 changed files with 504 additions and 63 deletions

View File

@ -18,9 +18,27 @@ Following python packages need to be installed before running the tests :
1. blockade
2. pytest==2.8.7
You can execute the tests with following command-lines:
You can execute all blockade tests with following command-lines:
```
cd $DIRECTORY_OF_OZONE
python -m pytest -s blockade/
```
You can also execute fewer blockade tests with following command-lines:
```
cd $DIRECTORY_OF_OZONE
python -m pytest -s blockade/<PATH_TO_PYTHON_FILE>
e.g: python -m pytest -s blockade/test_blockade_datanode_isolation.py
```
You can change the default 'sleep' interval in the tests with following
command-lines:
```
cd $DIRECTORY_OF_OZONE
python -m pytest -s blockade/ --containerStatusSleep=<SECONDS>
e.g: python -m pytest -s blockade/ --containerStatusSleep=720
```

View File

@ -30,6 +30,7 @@ class Blockade(object):
@classmethod
def blockade_destroy(cls):
logger.info("Running blockade destroy")
call(["blockade", "destroy"])
@classmethod
@ -68,13 +69,16 @@ def blockade_create_partition(cls, *args):
nodes = ""
for node_list in args:
nodes = nodes + ','.join(node_list) + " "
exit_code, output = ClusterUtils.run_cmd("blockade partition %s" % nodes)
assert exit_code == 0, "blockade partition command failed with exit code=[%s]" % output
exit_code, output = \
ClusterUtils.run_cmd("blockade partition %s" % nodes)
assert exit_code == 0, \
"blockade partition command failed with exit code=[%s]" % output
@classmethod
def blockade_join(cls):
output = call(["blockade", "join"])
assert output == 0, "blockade join command failed with exit code=[%s]" % output
assert output == 0, "blockade join command failed with exit code=[%s]" \
% output
@classmethod
def blockade_stop(cls, node, all_nodes=False):
@ -82,7 +86,8 @@ def blockade_stop(cls, node, all_nodes=False):
output = call(["blockade", "stop", "--all"])
else:
output = call(["blockade", "stop", node])
assert output == 0, "blockade stop command failed with exit code=[%s]" % output
assert output == 0, "blockade stop command failed with exit code=[%s]" \
% output
@classmethod
def blockade_start(cls, node, all_nodes=False):
@ -90,4 +95,5 @@ def blockade_start(cls, node, all_nodes=False):
output = call(["blockade", "start", "--all"])
else:
output = call(["blockade", "start", node])
assert output == 0, "blockade start command failed with exit code=[%s]" % output
assert output == 0, "blockade start command failed with " \
"exit code=[%s]" % output

View File

@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module has apis to create and remove a blockade cluster"""
from subprocess import call
import subprocess
@ -36,11 +35,13 @@ def cluster_setup(cls, docker_compose_file, datanode_count):
logger.info("compose file :%s", docker_compose_file)
logger.info("number of DNs :%d", datanode_count)
call(["docker-compose", "-f", docker_compose_file, "down"])
call(["docker-compose", "-f", docker_compose_file, "up", "-d", "--scale", "datanode=" + str(datanode_count)])
call(["docker-compose", "-f", docker_compose_file, "up", "-d",
"--scale", "datanode=" + str(datanode_count)])
logger.info("Waiting 30s for cluster start up...")
time.sleep(30)
output = subprocess.check_output(["docker-compose", "-f", docker_compose_file, "ps"])
output = subprocess.check_output(["docker-compose", "-f",
docker_compose_file, "ps"])
output_array = output.split("\n")[2:-1]
container_list = []
@ -51,23 +52,31 @@ def cluster_setup(cls, docker_compose_file, datanode_count):
time.sleep(2)
assert container_list, "no container found!"
logger.info("blockade created with containers %s", ' '.join(container_list))
logger.info("blockade created with containers %s",
' '.join(container_list))
return container_list
@classmethod
def cluster_destroy(cls, docker_compose_file):
logger.info("Running docker-compose -f %s down", docker_compose_file)
call(["docker-compose", "-f", docker_compose_file, "down"])
@classmethod
def run_freon(cls, docker_compose_file, num_volumes, num_buckets, num_keys, key_size,
replication_type, replication_factor):
def run_freon(cls, docker_compose_file, num_volumes, num_buckets,
num_keys, key_size, replication_type, replication_factor):
# run freon
cmd = "docker-compose -f %s exec ozoneManager /opt/hadoop/bin/ozone freon rk " \
"--numOfVolumes %s --numOfBuckets %s --numOfKeys %s --keySize %s " \
"--replicationType %s --factor %s" % (docker_compose_file, num_volumes,
num_buckets, num_keys, key_size, replication_type,
replication_factor)
cmd = "docker-compose -f %s " \
"exec ozoneManager /opt/hadoop/bin/ozone " \
"freon rk " \
"--numOfVolumes %s " \
"--numOfBuckets %s " \
"--numOfKeys %s " \
"--keySize %s " \
"--replicationType %s " \
"--factor %s" % (docker_compose_file, num_volumes,
num_buckets, num_keys, key_size,
replication_type, replication_factor)
exit_code, output = cls.run_cmd(cmd)
return exit_code, output
@ -78,7 +87,8 @@ def run_cmd(cls, cmd):
command = ' '.join(cmd)
logger.info(" RUNNING: " + command)
all_output = ""
myprocess = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
myprocess = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, shell=True)
while myprocess.poll() is None:
op = myprocess.stdout.readline()
if op:
@ -97,38 +107,49 @@ def run_cmd(cls, cmd):
@classmethod
def get_ozone_confkey_value(cls, docker_compose_file, key_name):
cmd = "docker-compose -f %s exec ozoneManager /opt/hadoop/bin/ozone getconf -confKey %s" \
%(docker_compose_file, key_name)
cmd = "docker-compose -f %s " \
"exec ozoneManager /opt/hadoop/bin/ozone " \
"getconf -confKey %s" \
% (docker_compose_file, key_name)
exit_code, output = cls.run_cmd(cmd)
assert exit_code == 0, "getconf of key=[%s] failed with output=[%s]" %(key_name, output)
return output
assert exit_code == 0, "getconf of key=[%s] failed with output=[%s]" \
%(key_name, output)
return str(output).strip()
@classmethod
def find_scm_uuid(cls, docker_compose_file):
"""
This function returns scm uuid.
"""
ozone_metadata_dir = cls.get_ozone_confkey_value(docker_compose_file, "ozone.metadata.dirs")
cmd = "docker-compose -f %s exec scm cat %s/scm/current/VERSION" % (docker_compose_file, ozone_metadata_dir)
ozone_metadata_dir = cls.get_ozone_confkey_value(docker_compose_file,
"ozone.metadata.dirs")
cmd = "docker-compose -f %s exec scm cat %s/scm/current/VERSION" % \
(docker_compose_file, ozone_metadata_dir)
exit_code, output = cls.run_cmd(cmd)
assert exit_code == 0, "get scm UUID failed with output=[%s]" % output
output_list = output.split("\n")
output_list = list(filter(lambda x: re.search("\w+=\w+", x), output_list))
output_list = list(filter(lambda x: re.search("\w+=\w+", x),
output_list))
output_dict = dict(map(lambda x: x.split("="), output_list))
return str(output_dict['scmUuid']).strip()
@classmethod
def find_datanode_container_status(cls, docker_compose_file, datanode_index):
def find_datanode_container_status(cls, docker_compose_file,
datanode_index):
"""
This function returns the datanode's container replica state.
"""
datanode_dir = cls.get_ozone_confkey_value(docker_compose_file, "hdds.datanode.dir")
datanode_dir = cls.get_ozone_confkey_value(docker_compose_file,
"hdds.datanode.dir")
scm_uuid = cls.find_scm_uuid(docker_compose_file)
container_parent_path = "%s/hdds/%s/current/containerDir0" %(datanode_dir, scm_uuid)
cmd = "docker-compose -f %s exec --index=%s datanode find %s -type f -name '*.container'" \
container_parent_path = "%s/hdds/%s/current/containerDir0" % \
(datanode_dir, scm_uuid)
cmd = "docker-compose -f %s exec --index=%s datanode find %s -type f " \
"-name '*.container'" \
% (docker_compose_file, datanode_index, container_parent_path)
exit_code, output = cls.run_cmd(cmd)
assert exit_code == 0, "command=[%s] failed with output=[%s]" % (cmd, output)
assert exit_code == 0, "command=[%s] failed with output=[%s]" % \
(cmd, output)
assert output, "No container info present"
container_list = map(str.strip, output.split("\n"))
container_state = None
@ -136,9 +157,12 @@ def find_datanode_container_status(cls, docker_compose_file, datanode_index):
cmd = "docker-compose -f %s exec --index=%s datanode cat %s" \
% (docker_compose_file, datanode_index, container_path)
exit_code, output = cls.run_cmd(cmd)
assert exit_code == 0, "command=[%s] failed with output=[%s]" % (cmd, output)
assert exit_code == 0, "command=[%s] failed with output=[%s]" % \
(cmd, output)
container_db_list = output.split("\n")
container_db_list = list(filter(lambda x: re.search("\w+:\s\w+", x), container_db_list))
container_db_list = \
list(filter(lambda x: re.search("\w+:\s\w+", x),
container_db_list))
container_db_info = "\n".join(container_db_list)
container_db_dict = yaml.load(container_db_info)
for key, value in container_db_dict.items():
@ -146,7 +170,8 @@ def find_datanode_container_status(cls, docker_compose_file, datanode_index):
if not container_state:
container_state = container_db_dict['state']
else:
assert container_db_dict['state'] == container_state, "all containers are not in same state"
assert container_db_dict['state'] == container_state, \
"all containers are not in same state"
return container_state
@ -157,7 +182,10 @@ def find_all_datanodes_container_status(cls, docker_compose_file, scale):
"""
all_datanode_container_status = []
for index in range(scale):
all_datanode_container_status.append(cls.find_datanode_container_status(docker_compose_file, index+1))
logger.info("All datanodes container status: %s", ' '.join(all_datanode_container_status))
all_datanode_container_status.append(
cls.find_datanode_container_status(docker_compose_file,
index+1))
logger.info("All datanodes container status: %s",
' '.join(all_datanode_container_status))
return all_datanode_container_status

View File

@ -18,18 +18,29 @@
def pytest_addoption(parser):
parser.addoption("--output-dir", action="store",
parser.addoption("--output-dir",
action="store",
default="/tmp/BlockadeTests",
help="location of output directory where output log and plot files will be created")
help="location of output directory where output log "
"and plot files will be created")
parser.addoption("--log-format",
action="store",
default="%(asctime)s|%(levelname)s|%(threadName)s|%(filename)s:%(lineno)s -"
default="%(asctime)s|%(levelname)s|%(threadName)s|"
"%(filename)s:%(lineno)s -"
" %(funcName)s()|%(message)s",
help="specify log format")
parser.addoption("--log-level", action="store", default="info", help="specify log level")
parser.addoption("--log-level",
action="store",
default="info",
help="specify log level")
parser.addoption("--containerStatusSleep",
action="store",
default="900",
help="sleep time before checking container status")
def pytest_configure(config):
os.environ["CONTAINER_STATUS_SLEEP"] = config.option.containerStatusSleep
outputdir = config.option.output_dir
try:
os.makedirs(outputdir)
@ -42,7 +53,10 @@ def pytest_configure(config):
else:
loglevel = eval("logging." + config.option.log_level.upper())
logformatter = logging.Formatter(config.option.log_format)
logging.basicConfig(filename=log_file, filemode='w', level=loglevel, format=config.option.log_format)
logging.basicConfig(filename=log_file,
filemode='w',
level=loglevel,
format=config.option.log_format)
console = logging.StreamHandler()
console.setLevel(loglevel)
console.setFormatter(logformatter)
@ -55,11 +69,13 @@ def pytest_report_teststatus(report):
if report.outcome == 'skipped':
pass
elif report.when == 'setup':
logger.info("RUNNING TEST \"%s\" at location \"%s\" at line number \"%s\"" % (name, loc, str(line)))
logger.info("RUNNING TEST \"%s\" at location \"%s\" at line number"
" \"%s\"" % (name, loc, str(line)))
elif report.when == 'call':
logger.info("TEST \"%s\" %s in %3.2f seconds" % (name, report.outcome.upper(), report.duration))
logger.info("TEST \"%s\" %s in %3.2f seconds" %
(name, report.outcome.upper(), report.duration))
def pytest_sessionfinish(session):
logger = logging.getLogger('main')
logger.info("ALL TESTS FINISHED")
logger.info("ALL TESTS FINISHED")

View File

@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module has apis to create and remove a blockade cluster"""
import os
import time
import logging
@ -25,7 +24,8 @@
logger = logging.getLogger(__name__)
parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
FILE = os.path.join(parent_dir, "compose", "ozone", "docker-compose.yaml")
FILE = os.path.join(parent_dir, "compose", "ozoneblockade",
"docker-compose.yaml")
SCALE = 3
CONTAINER_LIST = []
OM = []
@ -38,12 +38,14 @@ def setup():
Blockade.blockade_destroy()
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
exit_code, output = Blockade.blockade_status()
assert exit_code == 0, "blockade status command failed with output=[%s]" % output
assert exit_code == 0, "blockade status command failed with output=[%s]" % \
output
OM = filter(lambda x: 'ozoneManager' in x, CONTAINER_LIST)
SCM = filter(lambda x: 'scm' in x, CONTAINER_LIST)
DATANODES = sorted(list(filter(lambda x: 'datanode' in x, CONTAINER_LIST)))
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
"THREE")
assert exit_code == 0, "freon run failed with output=[%s]" % output
@ -58,25 +60,38 @@ def teardown_module():
def test_datanode_isolation_one_node():
"""
In this test, one of the datanodes cannot communicate with other two datanodes.
In this test, one of the datanodes (first datanode) cannot communicate
with other two datanodes.
All datanodes can communicate with SCM.
Expectation :
The container replica state in first datanode should be quasi-closed.
The container replica state in other datanodes should be closed.
"""
first_set = [OM[0], SCM[0], DATANODES[0]]
second_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]]
Blockade.blockade_create_partition(first_set, second_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for 480 seconds before checking container status")
time.sleep(480)
all_datanodes_container_status = ClusterUtils.find_all_datanodes_container_status(FILE, SCALE)
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED', all_datanodes_container_status)
assert len(count_closed_container_datanodes) == 3, "The container should have three closed replicas."
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.find_all_datanodes_container_status(FILE, SCALE)
first_datanode_status = all_datanodes_container_status[0]
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED',
all_datanodes_container_status)
assert first_datanode_status == 'QUASI_CLOSED'
assert len(count_closed_container_datanodes) == 2, \
"The container should have three closed replicas."
def test_datanode_isolation_all():
"""
In this test, none of the datanodes can communicate with other two datanodes.
In this test, none of the datanodes can communicate with other two
datanodes.
All datanodes can communicate with SCM.
Expectation : The container should eventually have at least two closed
replicas.
"""
first_set = [OM[0], SCM[0], DATANODES[0]]
second_set = [OM[0], SCM[0], DATANODES[1]]
@ -84,8 +99,12 @@ def test_datanode_isolation_all():
Blockade.blockade_create_partition(first_set, second_set, third_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for 480 seconds before checking container status")
time.sleep(480)
all_datanodes_container_status = ClusterUtils.find_all_datanodes_container_status(FILE, SCALE)
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED', all_datanodes_container_status)
assert len(count_closed_container_datanodes) >= 2, "The container should have at least two closed replicas."
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.find_all_datanodes_container_status(FILE, SCALE)
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED',
all_datanodes_container_status)
assert len(count_closed_container_datanodes) >= 2, \
"The container should have at least two closed replicas."

View File

@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module has apis to create and remove a blockade cluster"""
import os
import time
import logging
@ -26,7 +25,8 @@
logger = logging.getLogger(__name__)
parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
FILE = os.path.join(parent_dir, "compose", "ozone", "docker-compose.yaml")
FILE = os.path.join(parent_dir, "compose", "ozoneblockade",
"docker-compose.yaml")
SCALE = 6
CONTAINER_LIST = []
@ -36,7 +36,8 @@ def setup_module():
Blockade.blockade_destroy()
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
exit_code, output = Blockade.blockade_status()
assert exit_code == 0, "blockade status command failed with output=[%s]" % output
assert exit_code == 0, "blockade status command failed with output=[%s]" % \
output
def teardown_module():
@ -54,5 +55,6 @@ def teardown():
def test_flaky(flaky_nodes):
Blockade.make_flaky(flaky_nodes, CONTAINER_LIST)
Blockade.blockade_status()
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
"THREE")
assert exit_code == 0, "freon run failed with output=[%s]" % output

View File

@ -0,0 +1,116 @@
#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import logging
from blockadeUtils.blockade import Blockade
from clusterUtils.cluster_utils import ClusterUtils
logger = logging.getLogger(__name__)
parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
FILE = os.path.join(parent_dir, "compose", "ozoneblockade",
"docker-compose.yaml")
SCALE = 3
CONTAINER_LIST = []
OM = []
SCM = []
DATANODES = []
def setup():
global CONTAINER_LIST, OM, SCM, DATANODES
Blockade.blockade_destroy()
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
exit_code, output = Blockade.blockade_status()
assert exit_code == 0, "blockade status command failed with output=[%s]" % \
output
OM = filter(lambda x: 'ozoneManager' in x, CONTAINER_LIST)
SCM = filter(lambda x: 'scm' in x, CONTAINER_LIST)
DATANODES = sorted(list(filter(lambda x: 'datanode' in x, CONTAINER_LIST)))
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
"THREE")
assert exit_code == 0, "freon run failed with output=[%s]" % output
def teardown():
logger.info("Inside teardown")
Blockade.blockade_destroy()
def teardown_module():
ClusterUtils.cluster_destroy(FILE)
def test_one_dn_isolate_scm_other_dn():
"""
In this test, one of the datanodes cannot communicate with SCM and other
datanodes.
Other datanodes can communicate with each other and SCM .
Expectation : The container should eventually have two closed replicas.
"""
first_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]]
second_set = [OM[0], DATANODES[0]]
Blockade.blockade_create_partition(first_set, second_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.find_all_datanodes_container_status(FILE, SCALE)
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED',
all_datanodes_container_status)
assert len(count_closed_container_datanodes) == 2, \
"The container should have two closed replicas."
def test_one_dn_isolate_other_dn():
"""
In this test, one of the datanodes (first datanode) cannot communicate
other datanodes but can communicate with SCM.
One of the other two datanodes (second datanode) cannot communicate with
SCM.
Expectation :
The container replica state in first datanode can be either closed or
quasi-closed.
The container replica state in second datanode can be either closed or open.
The container should eventually have at lease one closed replica.
"""
first_set = [OM[0], SCM[0], DATANODES[0]]
second_set = [OM[0], DATANODES[1], DATANODES[2]]
third_set = [SCM[0], DATANODES[2]]
Blockade.blockade_create_partition(first_set, second_set, third_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.find_all_datanodes_container_status(FILE, SCALE)
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED',
all_datanodes_container_status)
first_datanode_status = all_datanodes_container_status[0]
second_datanode_status = all_datanodes_container_status[1]
assert first_datanode_status == 'CLOSED' or \
first_datanode_status == "QUASI_CLOSED"
assert second_datanode_status == 'CLOSED' or \
second_datanode_status == "OPEN"
assert len(count_closed_container_datanodes) >= 1, \
"The container should have at least one closed replica"

View File

@ -0,0 +1,110 @@
#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import logging
from blockadeUtils.blockade import Blockade
from clusterUtils.cluster_utils import ClusterUtils
logger = logging.getLogger(__name__)
parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
FILE = os.path.join(parent_dir, "compose", "ozoneblockade",
"docker-compose.yaml")
SCALE = 3
CONTAINER_LIST = []
OM = []
SCM = []
DATANODES = []
def setup():
global CONTAINER_LIST, OM, SCM, DATANODES
Blockade.blockade_destroy()
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
exit_code, output = Blockade.blockade_status()
assert exit_code == 0, "blockade status command failed with output=[%s]" % \
output
OM = filter(lambda x: 'ozoneManager' in x, CONTAINER_LIST)
SCM = filter(lambda x: 'scm' in x, CONTAINER_LIST)
DATANODES = sorted(list(filter(lambda x: 'datanode' in x, CONTAINER_LIST)))
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
"THREE")
assert exit_code == 0, "freon run failed with output=[%s]" % output
def teardown():
logger.info("Inside teardown")
Blockade.blockade_destroy()
def teardown_module():
ClusterUtils.cluster_destroy(FILE)
def test_scm_isolation_one_node():
"""
In this test, one of the datanodes cannot communicate with SCM.
Other datanodes can communicate with SCM.
Expectation : The container should eventually have at least two closed
replicas.
"""
first_set = [OM[0], DATANODES[0], DATANODES[1], DATANODES[2]]
second_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]]
Blockade.blockade_create_partition(first_set, second_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.find_all_datanodes_container_status(FILE, SCALE)
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED',
all_datanodes_container_status)
assert len(count_closed_container_datanodes) >= 2, \
"The container should have at least two closed replicas."
def test_scm_isolation_two_node():
"""
In this test, two datanodes cannot communicate with SCM.
Expectation : The container should eventually have at three closed replicas
or, two open replicas and one quasi-closed replica.
"""
first_set = [OM[0], DATANODES[0], DATANODES[1], DATANODES[2]]
second_set = [OM[0], SCM[0], DATANODES[1]]
Blockade.blockade_create_partition(first_set, second_set)
Blockade.blockade_status()
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
logger.info("Waiting for %s seconds before checking container status",
os.environ["CONTAINER_STATUS_SLEEP"])
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
all_datanodes_container_status = \
ClusterUtils.find_all_datanodes_container_status(FILE, SCALE)
count_closed_container_datanodes = filter(lambda x: x == 'CLOSED',
all_datanodes_container_status)
count_qausi_closed_container_datanodes = \
filter(lambda x: x == 'QUASI_CLOSED', all_datanodes_container_status)
count_open_container_datanodes = filter(lambda x: x == 'OPEN',
all_datanodes_container_status)
assert len(count_closed_container_datanodes) == 3 or \
(len(count_open_container_datanodes) == 2 and
len(count_qausi_closed_container_datanodes) == 1), \
"The container should have three closed replicas or two open " \
"replicas and one quasi_closed replica."

View File

@ -0,0 +1,49 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: "3"
services:
datanode:
image: apache/hadoop-runner
volumes:
- ../..:/opt/hadoop
ports:
- 9864
command: ["/opt/hadoop/bin/ozone","datanode"]
env_file:
- ./docker-config
ozoneManager:
image: apache/hadoop-runner
volumes:
- ../..:/opt/hadoop
ports:
- 9874:9874
environment:
ENSURE_OM_INITIALIZED: /data/metadata/ozoneManager/current/VERSION
env_file:
- ./docker-config
command: ["/opt/hadoop/bin/ozone","om"]
scm:
image: apache/hadoop-runner
volumes:
- ../..:/opt/hadoop
ports:
- 9876:9876
env_file:
- ./docker-config
environment:
ENSURE_SCM_INITIALIZED: /data/metadata/scm/current/VERSION
command: ["/opt/hadoop/bin/ozone","scm"]

View File

@ -0,0 +1,77 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
OZONE-SITE.XML_ozone.om.address=ozoneManager
OZONE-SITE.XML_ozone.om.http-address=ozoneManager:9874
OZONE-SITE.XML_ozone.scm.names=scm
OZONE-SITE.XML_ozone.enabled=True
OZONE-SITE.XML_ozone.scm.datanode.id=/data/datanode.id
OZONE-SITE.XML_ozone.scm.block.client.address=scm
OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata
OZONE-SITE.XML_ozone.handler.type=distributed
OZONE-SITE.XML_ozone.scm.client.address=scm
OZONE-SITE.XML_ozone.scm.dead.node.interval=5m
OZONE-SITE.XML_ozone.replication=1
OZONE-SITE.XML_hdds.datanode.dir=/data/hdds
HDFS-SITE.XML_rpc.metrics.quantile.enable=true
HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300
LOG4J.PROPERTIES_log4j.rootLogger=INFO, stdout
LOG4J.PROPERTIES_log4j.appender.stdout=org.apache.log4j.ConsoleAppender
LOG4J.PROPERTIES_log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
LOG4J.PROPERTIES_log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
LOG4J.PROPERTIES_log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
LOG4J.PROPERTIES_log4j.logger.org.apache.ratis.conf.ConfUtils=WARN
LOG4J.PROPERTIES_log4j.logger.org.apache.hadoop.security.ShellBasedUnixGroupsMapping=ERROR
#Enable this variable to print out all hadoop rpc traffic to the stdout. See http://byteman.jboss.org/ to define your own instrumentation.
#BYTEMAN_SCRIPT_URL=https://raw.githubusercontent.com/apache/hadoop/trunk/dev-support/byteman/hadooprpc.btm
#LOG4J2.PROPERTIES_* are for Ozone Audit Logging
LOG4J2.PROPERTIES_monitorInterval=30
LOG4J2.PROPERTIES_filter=read,write
LOG4J2.PROPERTIES_filter.read.type=MarkerFilter
LOG4J2.PROPERTIES_filter.read.marker=READ
LOG4J2.PROPERTIES_filter.read.onMatch=DENY
LOG4J2.PROPERTIES_filter.read.onMismatch=NEUTRAL
LOG4J2.PROPERTIES_filter.write.type=MarkerFilter
LOG4J2.PROPERTIES_filter.write.marker=WRITE
LOG4J2.PROPERTIES_filter.write.onMatch=NEUTRAL
LOG4J2.PROPERTIES_filter.write.onMismatch=NEUTRAL
LOG4J2.PROPERTIES_appenders=console, rolling
LOG4J2.PROPERTIES_appender.console.type=Console
LOG4J2.PROPERTIES_appender.console.name=STDOUT
LOG4J2.PROPERTIES_appender.console.layout.type=PatternLayout
LOG4J2.PROPERTIES_appender.console.layout.pattern=%d{DEFAULT} | %-5level | %c{1} | %msg | %throwable{3} %n
LOG4J2.PROPERTIES_appender.rolling.type=RollingFile
LOG4J2.PROPERTIES_appender.rolling.name=RollingFile
LOG4J2.PROPERTIES_appender.rolling.fileName =${sys:hadoop.log.dir}/om-audit-${hostName}.log
LOG4J2.PROPERTIES_appender.rolling.filePattern=${sys:hadoop.log.dir}/om-audit-${hostName}-%d{yyyy-MM-dd-HH-mm-ss}-%i.log.gz
LOG4J2.PROPERTIES_appender.rolling.layout.type=PatternLayout
LOG4J2.PROPERTIES_appender.rolling.layout.pattern=%d{DEFAULT} | %-5level | %c{1} | %msg | %throwable{3} %n
LOG4J2.PROPERTIES_appender.rolling.policies.type=Policies
LOG4J2.PROPERTIES_appender.rolling.policies.time.type=TimeBasedTriggeringPolicy
LOG4J2.PROPERTIES_appender.rolling.policies.time.interval=86400
LOG4J2.PROPERTIES_appender.rolling.policies.size.type=SizeBasedTriggeringPolicy
LOG4J2.PROPERTIES_appender.rolling.policies.size.size=64MB
LOG4J2.PROPERTIES_loggers=audit
LOG4J2.PROPERTIES_logger.audit.type=AsyncLogger
LOG4J2.PROPERTIES_logger.audit.name=OMAudit
LOG4J2.PROPERTIES_logger.audit.level=INFO
LOG4J2.PROPERTIES_logger.audit.appenderRefs=rolling
LOG4J2.PROPERTIES_logger.audit.appenderRef.file.ref=RollingFile
LOG4J2.PROPERTIES_rootLogger.level=INFO
LOG4J2.PROPERTIES_rootLogger.appenderRefs=stdout
LOG4J2.PROPERTIES_rootLogger.appenderRef.stdout.ref=STDOUT