422 lines
20 KiB
Python
422 lines
20 KiB
Python
|
#!/usr/bin/env python
|
||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||
|
# or more contributor license agreements. See the NOTICE file
|
||
|
# distributed with this work for additional information
|
||
|
# regarding copyright ownership. The ASF licenses this file
|
||
|
# to you under the Apache License, Version 2.0 (the
|
||
|
# "License"); you may not use this file except in compliance
|
||
|
# with the License. You may obtain a copy of the License at
|
||
|
#
|
||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
#
|
||
|
# Unless required by applicable law or agreed to in writing, software
|
||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
# See the License for the specific language governing permissions and
|
||
|
# limitations under the License.
|
||
|
|
||
|
"""The actions module for the apache_hbase topology. The behavior to be carried out by the
|
||
|
build_cluster and start_cluster clusterdock scripts are to be defined through the build and
|
||
|
start functions, respectively.
|
||
|
"""
|
||
|
|
||
|
import logging
|
||
|
import tarfile
|
||
|
from ConfigParser import ConfigParser
|
||
|
from os import EX_OK, listdir, makedirs, remove # pylint: disable=ungrouped-imports
|
||
|
# Follow convention of grouping from module imports
|
||
|
# after normal imports.
|
||
|
from os.path import exists, join
|
||
|
from shutil import move
|
||
|
from socket import getfqdn
|
||
|
from sys import stdout
|
||
|
from uuid import uuid4
|
||
|
|
||
|
# pylint: disable=import-error
|
||
|
# clusterdock topologies get access to the clusterdock package at run time, but their reference
|
||
|
# to clusterdock modules will confuse pylint, so we have to disable it.
|
||
|
|
||
|
import requests
|
||
|
from docker import Client
|
||
|
|
||
|
from clusterdock import Constants
|
||
|
from clusterdock.cluster import Cluster, Node, NodeGroup
|
||
|
from clusterdock.docker_utils import (build_image, get_clusterdock_container_id,
|
||
|
get_host_port_binding, is_image_available_locally, pull_image)
|
||
|
from clusterdock.utils import strip_components_from_tar, XmlConfiguration
|
||
|
|
||
|
# We disable a couple of Pylint conventions because it assumes that module level variables must be
|
||
|
# named as if they're constants (which isn't the case here).
|
||
|
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
|
||
|
logger.setLevel(logging.INFO)
|
||
|
|
||
|
client = Client() # pylint: disable=invalid-name
|
||
|
|
||
|
DEFAULT_APACHE_NAMESPACE = Constants.DEFAULT.apache_namespace # pylint: disable=no-member
|
||
|
|
||
|
def _copy_container_folder_to_host(container_id, source_folder, destination_folder,
|
||
|
host_folder=None):
|
||
|
if not exists(destination_folder):
|
||
|
makedirs(destination_folder)
|
||
|
stream, _ = client.get_archive(container_id, source_folder)
|
||
|
tar_filename = join(destination_folder, 'container_folder.tar')
|
||
|
with open(tar_filename, 'wb') as file_descriptor:
|
||
|
file_descriptor.write(stream.read())
|
||
|
tar = tarfile.open(name=tar_filename)
|
||
|
tar.extractall(path=destination_folder, members=strip_components_from_tar(tar))
|
||
|
tar.close()
|
||
|
remove(tar_filename)
|
||
|
logger.info("Extracted container folder %s to %s.", source_folder,
|
||
|
host_folder if host_folder else destination_folder)
|
||
|
|
||
|
def _create_configs_from_file(filename, cluster_config_dir, wildcards):
|
||
|
configurations = ConfigParser(allow_no_value=True)
|
||
|
configurations.read(filename)
|
||
|
|
||
|
for config_file in configurations.sections():
|
||
|
logger.info("Updating %s...", config_file)
|
||
|
# For XML configuration files, run things through XmlConfiguration.
|
||
|
if config_file.endswith('.xml'):
|
||
|
XmlConfiguration(
|
||
|
{item[0]: item[1].format(**wildcards)
|
||
|
for item in configurations.items(config_file)}
|
||
|
).write_to_file(join(cluster_config_dir, config_file))
|
||
|
# For everything else, recognize whether a line in the configuration should simply be
|
||
|
# appended to the bottom of a file or processed in some way. The presence of +++ will
|
||
|
# lead to the evaluation of the following string through the end of the line.
|
||
|
else:
|
||
|
lines = []
|
||
|
for item in configurations.items(config_file):
|
||
|
if item[0].startswith('+++'):
|
||
|
command = item[0].lstrip('+ ').format(**wildcards)
|
||
|
|
||
|
# Yes, we use eval here. This is potentially dangerous, but intention.
|
||
|
lines.append(str(eval(command))) # pylint: disable=eval-used
|
||
|
elif item[0] == "body":
|
||
|
lines.append(item[1].format(**wildcards))
|
||
|
else:
|
||
|
lines.append(item[0].format(**wildcards))
|
||
|
with open(join(cluster_config_dir, config_file), 'w') as conf:
|
||
|
conf.write("".join(["{0}\n".format(line) for line in lines]))
|
||
|
|
||
|
# Keep track of some common web UI ports that we'll expose to users later (e.g. to allow a user
|
||
|
# to reach the HDFS NameNode web UI over the internet).
|
||
|
HBASE_REST_SERVER_PORT = 8080
|
||
|
NAMENODE_WEB_UI_PORT = 50070
|
||
|
RESOURCEMANAGER_WEB_UI_PORT = 8088
|
||
|
|
||
|
# When starting or building cluster, CLUSTERDOCK_VOLUME will be the root directory for persistent
|
||
|
# files (note that this location will itself be in a Docker container's filesystem).
|
||
|
CLUSTERDOCK_VOLUME = '/tmp/clusterdock'
|
||
|
|
||
|
def start(args):
|
||
|
"""This function will be executed when ./bin/start_cluster apache_hbase is invoked."""
|
||
|
|
||
|
# pylint: disable=too-many-locals
|
||
|
# Pylint doesn't want more than 15 local variables in a function; this one has 17. This is about
|
||
|
# as low as I want to go because, while I can cheat and stuff unrelated things in a dictionary,
|
||
|
# that won't improve readability.
|
||
|
|
||
|
uuid = str(uuid4())
|
||
|
container_cluster_config_dir = join(CLUSTERDOCK_VOLUME, uuid, 'config')
|
||
|
makedirs(container_cluster_config_dir)
|
||
|
|
||
|
for mount in client.inspect_container(get_clusterdock_container_id())['Mounts']:
|
||
|
if mount['Destination'] == CLUSTERDOCK_VOLUME:
|
||
|
host_cluster_config_dir = join(mount['Source'], uuid, 'config')
|
||
|
break
|
||
|
else:
|
||
|
raise Exception("Could not find source of {0} mount.".format(CLUSTERDOCK_VOLUME))
|
||
|
|
||
|
# CLUSTERDOCK_VOLUME/uuid/config in the clusterdock container corresponds to
|
||
|
# host_cluster_config_dir on the Docker host.
|
||
|
logger.debug("Creating directory for cluster configuration files in %s...",
|
||
|
host_cluster_config_dir)
|
||
|
|
||
|
# Generate the image name to use from the command line arguments passed in.
|
||
|
image = '/'.join(
|
||
|
[item
|
||
|
for item in [args.registry_url, args.namespace or DEFAULT_APACHE_NAMESPACE,
|
||
|
"clusterdock:{os}_java-{java}_hadoop-{hadoop}_hbase-{hbase}".format(
|
||
|
os=args.operating_system, java=args.java_version,
|
||
|
hadoop=args.hadoop_version, hbase=args.hbase_version
|
||
|
)]
|
||
|
if item]
|
||
|
)
|
||
|
if args.always_pull or not is_image_available_locally(image):
|
||
|
pull_image(image)
|
||
|
|
||
|
# Before starting the cluster, we create a throwaway container from which we copy
|
||
|
# configuration files back to the host. We also use this container to run an HBase
|
||
|
# command that returns the port of the HBase master web UI. Since we aren't running init here,
|
||
|
# we also have to manually pass in JAVA_HOME as an environmental variable.
|
||
|
get_hbase_web_ui_port_command = ('/hbase/bin/hbase org.apache.hadoop.hbase.util.HBaseConfTool '
|
||
|
'hbase.master.info.port')
|
||
|
container_id = client.create_container(image=image, command=get_hbase_web_ui_port_command,
|
||
|
environment={'JAVA_HOME': '/java'})['Id']
|
||
|
logger.debug("Created temporary container (id: %s) from which to copy configuration files.",
|
||
|
container_id)
|
||
|
|
||
|
# Actually do the copying of Hadoop configs...
|
||
|
_copy_container_folder_to_host(container_id, '/hadoop/etc/hadoop',
|
||
|
join(container_cluster_config_dir, 'hadoop'),
|
||
|
join(host_cluster_config_dir, 'hadoop'))
|
||
|
|
||
|
# ... and repeat for HBase configs.
|
||
|
_copy_container_folder_to_host(container_id, '/hbase/conf',
|
||
|
join(container_cluster_config_dir, 'hbase'),
|
||
|
join(host_cluster_config_dir, 'hbase'))
|
||
|
|
||
|
logger.info("The /hbase/lib folder on containers in the cluster will be volume mounted "
|
||
|
"into %s...", join(host_cluster_config_dir, 'hbase-lib'))
|
||
|
_copy_container_folder_to_host(container_id, '/hbase/lib',
|
||
|
join(container_cluster_config_dir, 'hbase-lib'),
|
||
|
join(host_cluster_config_dir, 'hbase-lib'))
|
||
|
|
||
|
# Every node in the cluster will have a shared volume mount from the host for Hadoop and HBase
|
||
|
# configuration files as well as the HBase lib folder.
|
||
|
shared_volumes = [{join(host_cluster_config_dir, 'hadoop'): '/hadoop/etc/hadoop'},
|
||
|
{join(host_cluster_config_dir, 'hbase'): '/hbase/conf'},
|
||
|
{join(host_cluster_config_dir, 'hbase-lib'): '/hbase/lib'}]
|
||
|
|
||
|
# Get the HBase master web UI port, stripping the newline the Docker REST API gives us.
|
||
|
client.start(container=container_id)
|
||
|
if client.wait(container=container_id) == EX_OK:
|
||
|
hbase_master_web_ui_port = client.logs(container=container_id).rstrip()
|
||
|
client.remove_container(container=container_id, force=True)
|
||
|
else:
|
||
|
raise Exception('Failed to remove HBase configuration container.')
|
||
|
|
||
|
# Create the Node objects. These hold the state of our container nodes and will be started
|
||
|
# at Cluster instantiation time.
|
||
|
primary_node = Node(hostname=args.primary_node[0], network=args.network,
|
||
|
image=image, ports=[NAMENODE_WEB_UI_PORT,
|
||
|
hbase_master_web_ui_port,
|
||
|
RESOURCEMANAGER_WEB_UI_PORT,
|
||
|
HBASE_REST_SERVER_PORT],
|
||
|
volumes=shared_volumes)
|
||
|
secondary_nodes = []
|
||
|
for hostname in args.secondary_nodes:
|
||
|
# A list of service directories will be used to name folders on the host and, appended
|
||
|
# with an index, in the container, as well (e.g. /data1/node-1/dfs:/dfs1).
|
||
|
service_directories = ['dfs', 'yarn']
|
||
|
|
||
|
# Every Node will have shared_volumes to let one set of configs on the host be propagated
|
||
|
# to every container. If --data-directories is specified, this will be appended to allow
|
||
|
# containers to use multiple disks on the host.
|
||
|
volumes = shared_volumes[:]
|
||
|
if args.data_directories:
|
||
|
data_directories = args.data_directories.split(',')
|
||
|
volumes += [{join(data_directory, uuid, hostname, service_directory):
|
||
|
"/{0}{1}".format(service_directory, i)}
|
||
|
for i, data_directory in enumerate(data_directories, start=1)
|
||
|
for service_directory in service_directories]
|
||
|
secondary_nodes.append(Node(hostname=hostname,
|
||
|
network=args.network,
|
||
|
image=image,
|
||
|
volumes=volumes))
|
||
|
|
||
|
Cluster(topology='apache_hbase',
|
||
|
node_groups=[NodeGroup(name='primary', nodes=[primary_node]),
|
||
|
NodeGroup(name='secondary', nodes=secondary_nodes)],
|
||
|
network_name=args.network).start()
|
||
|
|
||
|
# When creating configs, pass in a dictionary of wildcards into create_configurations_from_file
|
||
|
# to transform placeholders in the configurations.cfg file into real values.
|
||
|
_create_configs_from_file(filename=args.configurations,
|
||
|
cluster_config_dir=container_cluster_config_dir,
|
||
|
wildcards={"primary_node": args.primary_node,
|
||
|
"secondary_nodes": args.secondary_nodes,
|
||
|
"all_nodes": args.primary_node + args.secondary_nodes,
|
||
|
"network": args.network})
|
||
|
|
||
|
# After creating configurations from the configurations.cfg file, update hdfs-site.xml and
|
||
|
# yarn-site.xml to use the data directories passed on the command line.
|
||
|
if args.data_directories:
|
||
|
_update_config_for_data_dirs(
|
||
|
container_cluster_config_dir=container_cluster_config_dir,
|
||
|
data_directories=data_directories
|
||
|
)
|
||
|
|
||
|
if not args.dont_start_services:
|
||
|
_start_services(primary_node, hbase_master_web_ui_port=hbase_master_web_ui_port)
|
||
|
|
||
|
def _update_config_for_data_dirs(container_cluster_config_dir, data_directories):
|
||
|
logger.info('Updating dfs.datanode.data.dir in hdfs-site.xml...')
|
||
|
hdfs_site_xml_filename = join(container_cluster_config_dir, 'hadoop', 'hdfs-site.xml')
|
||
|
hdfs_site_xml = XmlConfiguration(
|
||
|
properties={'dfs.datanode.data.dir':
|
||
|
','.join(["/dfs{0}".format(i)
|
||
|
for i, _ in enumerate(data_directories, start=1)])},
|
||
|
source_file=hdfs_site_xml_filename
|
||
|
)
|
||
|
hdfs_site_xml.write_to_file(filename=hdfs_site_xml_filename)
|
||
|
|
||
|
logger.info('Updating yarn.nodemanager.local-dirs in yarn-site.xml...')
|
||
|
yarn_site_xml_filename = join(container_cluster_config_dir, 'hadoop', 'yarn-site.xml')
|
||
|
yarn_site_xml = XmlConfiguration(
|
||
|
properties={'yarn.nodemanager.local-dirs':
|
||
|
','.join(["/yarn{0}".format(i)
|
||
|
for i, _ in enumerate(data_directories, start=1)])},
|
||
|
source_file=yarn_site_xml_filename
|
||
|
)
|
||
|
yarn_site_xml.write_to_file(filename=yarn_site_xml_filename)
|
||
|
|
||
|
def _start_services(primary_node, **kwargs):
|
||
|
logger.info("Formatting namenode on %s...", primary_node.fqdn)
|
||
|
primary_node.ssh('hdfs namenode -format')
|
||
|
|
||
|
logger.info("Starting HDFS...")
|
||
|
primary_node.ssh('/hadoop/sbin/start-dfs.sh')
|
||
|
|
||
|
logger.info("Starting YARN...")
|
||
|
primary_node.ssh('/hadoop/sbin/start-yarn.sh')
|
||
|
|
||
|
logger.info('Starting HBase...')
|
||
|
primary_node.ssh('/hbase/bin/start-hbase.sh')
|
||
|
primary_node.ssh('/hbase/bin/hbase-daemon.sh start rest')
|
||
|
|
||
|
logger.info("NameNode and HBase master are located on %s. SSH over and have fun!",
|
||
|
primary_node.hostname)
|
||
|
|
||
|
logger.info("The HDFS NameNode web UI can be reached at http://%s:%s",
|
||
|
getfqdn(), get_host_port_binding(primary_node.container_id,
|
||
|
NAMENODE_WEB_UI_PORT))
|
||
|
|
||
|
logger.info("The YARN ResourceManager web UI can be reached at http://%s:%s",
|
||
|
getfqdn(), get_host_port_binding(primary_node.container_id,
|
||
|
RESOURCEMANAGER_WEB_UI_PORT))
|
||
|
|
||
|
logger.info("The HBase master web UI can be reached at http://%s:%s",
|
||
|
getfqdn(), get_host_port_binding(primary_node.container_id,
|
||
|
kwargs.get('hbase_master_web_ui_port')))
|
||
|
|
||
|
logger.info("The HBase REST server can be reached at http://%s:%s",
|
||
|
getfqdn(), get_host_port_binding(primary_node.container_id,
|
||
|
HBASE_REST_SERVER_PORT))
|
||
|
|
||
|
def build(args):
|
||
|
"""This function will be executed when ./bin/build_cluster apache_hbase is invoked."""
|
||
|
|
||
|
# pylint: disable=too-many-locals
|
||
|
# See start function above for rationale for disabling this warning.
|
||
|
|
||
|
container_build_dir = join(CLUSTERDOCK_VOLUME, str(uuid4()))
|
||
|
makedirs(container_build_dir)
|
||
|
|
||
|
# If --hbase-git-commit is specified, we build HBase from source.
|
||
|
if args.hbase_git_commit:
|
||
|
build_hbase_commands = [
|
||
|
"git clone https://github.com/apache/hbase.git {0}".format(container_build_dir),
|
||
|
"git -C {0} checkout {1}".format(container_build_dir, args.hbase_git_commit),
|
||
|
"mvn --batch-mode clean install -DskipTests assembly:single -f {0}/pom.xml".format(
|
||
|
container_build_dir
|
||
|
)
|
||
|
]
|
||
|
|
||
|
maven_image = Constants.docker_images.maven # pylint: disable=no-member
|
||
|
if not is_image_available_locally(maven_image):
|
||
|
pull_image(maven_image)
|
||
|
|
||
|
container_configs = {
|
||
|
'command': 'bash -c "{0}"'.format(' && '.join(build_hbase_commands)),
|
||
|
'image': maven_image,
|
||
|
'host_config': client.create_host_config(volumes_from=get_clusterdock_container_id())
|
||
|
}
|
||
|
|
||
|
maven_container_id = client.create_container(**container_configs)['Id']
|
||
|
client.start(container=maven_container_id)
|
||
|
for line in client.logs(container=maven_container_id, stream=True):
|
||
|
stdout.write(line)
|
||
|
stdout.flush()
|
||
|
|
||
|
# Mimic docker run --rm by blocking on docker wait and then removing the container
|
||
|
# if it encountered no errors.
|
||
|
if client.wait(container=maven_container_id) == EX_OK:
|
||
|
client.remove_container(container=maven_container_id, force=True)
|
||
|
else:
|
||
|
raise Exception('Error encountered while building HBase.')
|
||
|
|
||
|
assembly_target_dir = join(container_build_dir, 'hbase-assembly', 'target')
|
||
|
for a_file in listdir(assembly_target_dir):
|
||
|
if a_file.endswith('bin.tar.gz'):
|
||
|
args.hbase_tarball = join(assembly_target_dir, a_file)
|
||
|
break
|
||
|
|
||
|
# Download all the binary tarballs into our temporary directory so that we can add them
|
||
|
# into the Docker image we're building.
|
||
|
filenames = []
|
||
|
for tarball_location in [args.java_tarball, args.hadoop_tarball, args.hbase_tarball]:
|
||
|
tarball_filename = tarball_location.rsplit('/', 1)[-1]
|
||
|
filenames.append(tarball_filename)
|
||
|
|
||
|
# Download tarballs given as URLs.
|
||
|
if container_build_dir not in tarball_location:
|
||
|
get_request = requests.get(tarball_location, stream=True, cookies=(
|
||
|
{'oraclelicense': 'accept-securebackup-cookie'}
|
||
|
if tarball_location == args.java_tarball
|
||
|
else None
|
||
|
))
|
||
|
# Raise Exception if download failed.
|
||
|
get_request.raise_for_status()
|
||
|
logger.info("Downloading %s...", tarball_filename)
|
||
|
with open(join(container_build_dir, tarball_filename), 'wb') as file_descriptor:
|
||
|
for chunk in get_request.iter_content(1024):
|
||
|
file_descriptor.write(chunk)
|
||
|
else:
|
||
|
move(tarball_location, container_build_dir)
|
||
|
|
||
|
dockerfile_contents = r"""
|
||
|
FROM {nodebase_image}
|
||
|
COPY {java_tarball} /tarballs/
|
||
|
RUN mkdir /java && tar -xf /tarballs/{java_tarball} -C /java --strip-components=1
|
||
|
RUN echo "JAVA_HOME=/java" >> /etc/environment
|
||
|
|
||
|
COPY {hadoop_tarball} /tarballs/
|
||
|
RUN mkdir /hadoop && tar -xf /tarballs/{hadoop_tarball} -C /hadoop --strip-components=1
|
||
|
COPY {hbase_tarball} /tarballs/
|
||
|
RUN mkdir /hbase && tar -xf /tarballs/{hbase_tarball} -C /hbase --strip-components=1
|
||
|
|
||
|
# Remove tarballs folder.
|
||
|
RUN rm -rf /tarballs
|
||
|
|
||
|
# Set PATH explicitly.
|
||
|
RUN echo "PATH=/java/bin:/hadoop/bin:/hbase/bin/:$(echo $PATH)" >> /etc/environment
|
||
|
|
||
|
# Add hbase user and group before copying root's SSH keys over.
|
||
|
RUN groupadd hbase \
|
||
|
&& useradd -g hbase hbase \
|
||
|
&& cp -R /root/.ssh ~hbase \
|
||
|
&& chown -R hbase:hbase ~hbase/.ssh
|
||
|
|
||
|
# Disable requiretty in /etc/sudoers as required by HBase chaos monkey.
|
||
|
RUN sed -i 's/Defaults\s*requiretty/#&/' /etc/sudoers
|
||
|
""".format(nodebase_image='/'.join([item
|
||
|
for item in [args.registry_url,
|
||
|
args.namespace or DEFAULT_APACHE_NAMESPACE,
|
||
|
"clusterdock:{os}_nodebase".format(
|
||
|
os=args.operating_system
|
||
|
)]
|
||
|
if item]),
|
||
|
java_tarball=filenames[0], hadoop_tarball=filenames[1], hbase_tarball=filenames[2])
|
||
|
|
||
|
logger.info("Created Dockerfile: %s", dockerfile_contents)
|
||
|
|
||
|
with open(join(container_build_dir, 'Dockerfile'), 'w') as dockerfile:
|
||
|
dockerfile.write(dockerfile_contents)
|
||
|
|
||
|
image = '/'.join(
|
||
|
[item
|
||
|
for item in [args.registry_url, args.namespace or DEFAULT_APACHE_NAMESPACE,
|
||
|
"clusterdock:{os}_java-{java}_hadoop-{hadoop}_hbase-{hbase}".format(
|
||
|
os=args.operating_system, java=args.java_version,
|
||
|
hadoop=args.hadoop_version, hbase=args.hbase_version
|
||
|
)]
|
||
|
if item])
|
||
|
|
||
|
logger.info("Building image %s...", image)
|
||
|
build_image(dockerfile=join(container_build_dir, 'Dockerfile'), tag=image)
|
||
|
|
||
|
logger.info("Removing build temporary directory...")
|
||
|
return [image]
|