HADOOP-6466. Add a ZooKeeper service to the cloud scripts.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@896259 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas White 2010-01-05 22:54:51 +00:00
parent ae91b5d0f4
commit b55377a30d
5 changed files with 421 additions and 110 deletions

View File

@ -33,6 +33,8 @@ Trunk (unreleased changes)
HADOOP-6415. Adds a common token interface for both job token and
delegation token. (Kan Zhang via ddas)
HADOOP-6466. Add a ZooKeeper service to the cloud scripts. (tomwhite)
IMPROVEMENTS
HADOOP-6283. Improve the exception messages thrown by

View File

@ -316,3 +316,24 @@ larger changes, is to create you own image.
It's possible to use any image, as long as it i) runs (gzip compressed) user
data on boot, and ii) has Java installed.
OTHER SERVICES
==============
ZooKeeper
=========
You can run ZooKeeper by setting the "service" parameter to "zookeeper". For
example:
[my-zookeeper-cluster]
service=zookeeper
ami=ami-ed59bf84
instance_type=m1.small
key_name=tom
availability_zone=us-east-1c
public_key=PATH_TO_PUBLIC_KEY
private_key=PATH_TO_PRIVATE_KEY
Then to launch a three-node ZooKeeper ensemble, run:
% ./hadoop-ec2 launch-cluster my-zookeeper-cluster 3 zk

View File

@ -18,8 +18,8 @@ from __future__ import with_statement
import ConfigParser
from hadoop.cloud import VERSION
from hadoop.cloud.cluster import get_cluster
from hadoop.cloud.service import get_service
from hadoop.cloud.service import InstanceTemplate
from hadoop.cloud.service import HadoopService
from hadoop.cloud.service import NAMENODE
from hadoop.cloud.service import SECONDARY_NAMENODE
from hadoop.cloud.service import JOBTRACKER
@ -33,6 +33,7 @@ from optparse import make_option
import os
import sys
DEFAULT_SERVICE_NAME = 'hadoop'
DEFAULT_CLOUD_PROVIDER = 'ec2'
DEFAULT_CONFIG_DIR_NAME = '.hadoop-cloud'
@ -183,8 +184,10 @@ def parse_options_and_config(command, option_list=[], extra_arguments=(),
cluster_name = args[0]
opt = merge_config_with_options(cluster_name, config, options_dict)
logging.debug("Options: %s", str(opt))
cluster = get_cluster(get_cloud_provider(opt))(cluster_name, config_dir)
service = get_service(cluster)
service_name = get_service_name(opt)
cloud_provider = get_cloud_provider(opt)
cluster = get_cluster(cloud_provider)(cluster_name, config_dir)
service = get_service(service_name, cloud_provider)(cluster)
return (opt, args, service)
def parse_options(command, option_list=[], expected_arguments=(),
@ -223,15 +226,18 @@ def get_config_dir(options_dict):
config_dir = DEFAULT_CONFIG_DIR
return config_dir
def get_service_name(options_dict):
service_name = options_dict.get("service", None)
if service_name is None:
service_name = DEFAULT_SERVICE_NAME
return service_name
def get_cloud_provider(options_dict):
provider = options_dict.get("cloud_provider", None)
if provider is None:
provider = DEFAULT_CLOUD_PROVIDER
return provider
def get_service(cluster):
return HadoopService(cluster)
def check_options_set(options, option_names):
for option_name in option_names:
if options.get(option_name) is None:
@ -268,8 +274,10 @@ def main():
if command == 'list':
(opt, args) = parse_options(command, BASIC_OPTIONS, unbounded_args=True)
if len(args) == 0:
service = get_service(None)
service.list_all(get_cloud_provider(opt))
service_name = get_service_name(opt)
cloud_provider = get_cloud_provider(opt)
service = get_service(service_name, cloud_provider)(None)
service.list_all(cloud_provider)
else:
(opt, args, service) = parse_options_and_config(command, BASIC_OPTIONS)
service.list()

View File

@ -0,0 +1,112 @@
#!/bin/bash -x
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Script that is run on each EC2 instance on boot. It is passed in the EC2 user
# data, so should not exceed 16K in size after gzip compression.
#
# This script is executed by /etc/init.d/ec2-run-user-data, and output is
# logged to /var/log/messages.
################################################################################
################################################################################
# Initialize variables
################################################################################
# Substitute environment variables passed by the client
export %ENV%
ZK_VERSION=${ZK_VERSION:-3.2.2}
ZOOKEEPER_HOME=/usr/local/zookeeper-$ZK_VERSION
ZK_CONF_DIR=/etc/zookeeper/conf
function register_auto_shutdown() {
if [ ! -z "$AUTO_SHUTDOWN" ]; then
shutdown -h +$AUTO_SHUTDOWN >/dev/null &
fi
}
# Install a list of packages on debian or redhat as appropriate
function install_packages() {
if which dpkg &> /dev/null; then
apt-get update
apt-get -y install $@
elif which rpm &> /dev/null; then
yum install -y $@
else
echo "No package manager found."
fi
}
# Install any user packages specified in the USER_PACKAGES environment variable
function install_user_packages() {
if [ ! -z "$USER_PACKAGES" ]; then
install_packages $USER_PACKAGES
fi
}
function install_zookeeper() {
zk_tar_url=http://www.apache.org/dist/hadoop/zookeeper/zookeeper-$ZK_VERSION/zookeeper-$ZK_VERSION.tar.gz
zk_tar_file=`basename $zk_tar_url`
zk_tar_md5_file=`basename $zk_tar_url.md5`
curl="curl --retry 3 --silent --show-error --fail"
for i in `seq 1 3`;
do
$curl -O $zk_tar_url
$curl -O $zk_tar_url.md5
if md5sum -c $zk_tar_md5_file; then
break;
else
rm -f $zk_tar_file $zk_tar_md5_file
fi
done
if [ ! -e $zk_tar_file ]; then
echo "Failed to download $zk_tar_url. Aborting."
exit 1
fi
tar zxf $zk_tar_file -C /usr/local
rm -f $zk_tar_file $zk_tar_md5_file
echo "export ZOOKEEPER_HOME=$ZOOKEEPER_HOME" >> ~root/.bashrc
echo 'export PATH=$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$PATH' >> ~root/.bashrc
}
function configure_zookeeper() {
mkdir -p /mnt/zookeeper/logs
ln -s /mnt/zookeeper/logs /var/log/zookeeper
mkdir -p /var/log/zookeeper/txlog
mkdir -p $ZK_CONF_DIR
cp $ZOOKEEPER_HOME/conf/log4j.properties $ZK_CONF_DIR
sed -i -e "s|log4j.rootLogger=INFO, CONSOLE|log4j.rootLogger=INFO, ROLLINGFILE|" \
-e "s|log4j.appender.ROLLINGFILE.File=zookeeper.log|log4j.appender.ROLLINGFILE.File=/var/log/zookeeper/zookeeper.log|" \
$ZK_CONF_DIR/log4j.properties
# Ensure ZooKeeper starts on boot
cat > /etc/rc.local <<EOF
ZOOCFGDIR=$ZK_CONF_DIR $ZOOKEEPER_HOME/bin/zkServer.sh start > /dev/null 2>&1 &
EOF
}
register_auto_shutdown
install_user_packages
install_zookeeper
configure_zookeeper

View File

@ -71,13 +71,173 @@ class InstanceTemplate(object):
new_env_strings.extend(env_strings)
self.env_strings = new_env_strings
class HadoopService(object):
class Service(object):
"""
A general service that runs on a cluster.
"""
def __init__(self, cluster):
self.cluster = cluster
def get_service_code(self):
"""
The code that uniquely identifies the service.
"""
raise Exception("Unimplemented")
def list_all(self, provider):
"""
Find and print all clusters running this type of service.
"""
raise Exception("Unimplemented")
def list(self):
"""
Find and print all the instances running in this cluster.
"""
raise Exception("Unimplemented")
def launch_master(self, instance_template, config_dir, client_cidr):
"""
Launch a "master" instance.
"""
raise Exception("Unimplemented")
def launch_slaves(self, instance_template):
"""
Launch "slave" instance.
"""
raise Exception("Unimplemented")
def launch_cluster(self, instance_templates, config_dir, client_cidr):
"""
Launch a cluster of instances.
"""
raise Exception("Unimplemented")
def terminate_cluster(self, force=False):
self.cluster.print_status()
if not force and not self._prompt("Terminate all instances?"):
print "Not terminating cluster."
else:
print "Terminating cluster"
self.cluster.terminate()
def delete_cluster(self):
self.cluster.delete()
def create_formatted_snapshot(self, size, availability_zone,
image_id, key_name, ssh_options):
Ec2Storage.create_formatted_snapshot(self.cluster, size,
availability_zone,
image_id,
key_name,
ssh_options)
def list_storage(self):
storage = self.cluster.get_storage()
storage.print_status()
def create_storage(self, role, number_of_instances,
availability_zone, spec_file):
storage = self.cluster.get_storage()
storage.create(role, number_of_instances, availability_zone, spec_file)
storage.print_status()
def attach_storage(self, role):
storage = self.cluster.get_storage()
storage.attach(role, self.cluster.get_instances_in_role(role, 'running'))
storage.print_status()
def delete_storage(self, force=False):
storage = self.cluster.get_storage()
storage.print_status()
if not force and not self._prompt("Delete all storage volumes? THIS WILL \
PERMANENTLY DELETE ALL DATA"):
print "Not deleting storage volumes."
else:
print "Deleting storage"
for role in storage.get_roles():
storage.delete(role)
def login(self, ssh_options):
raise Exception("Unimplemented")
def proxy(self, ssh_options):
raise Exception("Unimplemented")
def push(self, ssh_options, file):
raise Exception("Unimplemented")
def execute(self, ssh_options, args):
raise Exception("Unimplemented")
def update_slaves_file(self, config_dir, ssh_options, private_key):
raise Exception("Unimplemented")
def _prompt(self, prompt):
""" Returns true if user responds "yes" to prompt. """
return raw_input("%s [yes or no]: " % prompt).lower() == "yes"
def _call(self, command):
print command
try:
subprocess.call(command, shell=True)
except Exception, e:
print e
def _get_default_user_data_file_template(self):
data_path = os.path.join(os.path.dirname(__file__), 'data')
return os.path.join(data_path, '%s-%s-init-remote.sh' %
(self.get_service_code(), self.cluster.get_provider_code()))
def _launch_instances(self, instance_template):
it = instance_template
user_data_file_template = it.user_data_file_template
if it.user_data_file_template == None:
user_data_file_template = self._get_default_user_data_file_template()
ebs_mappings = ''
storage = self.cluster.get_storage()
for role in it.roles:
if storage.has_any_storage((role,)):
ebs_mappings = storage.get_mappings_string_for_role(role)
replacements = { "%ENV%": build_env_string(it.env_strings, {
"ROLES": ",".join(it.roles),
"USER_PACKAGES": it.user_packages,
"AUTO_SHUTDOWN": it.auto_shutdown,
"EBS_MAPPINGS": ebs_mappings,
}) }
instance_user_data = InstanceUserData(user_data_file_template, replacements)
instance_ids = self.cluster.launch_instances(it.roles, it.number, it.image_id,
it.size_id,
instance_user_data,
key_name=it.key_name,
public_key=it.public_key,
placement=it.placement)
print "Waiting for %s instances in role %s to start" % \
(it.number, ",".join(it.roles))
try:
self.cluster.wait_for_instances(instance_ids)
print "%s instances started" % ",".join(it.roles)
except TimeoutException:
print "Timeout while waiting for %s instance to start." % ",".join(it.roles)
return
print
self.cluster.print_status(it.roles[0])
return self.cluster.get_instances_in_role(it.roles[0], "running")
class HadoopService(Service):
"""
A HDFS and MapReduce service.
"""
def __init__(self, cluster):
self.cluster = cluster
super(HadoopService, self).__init__(cluster)
def get_service_code(self):
return "hadoop"
def list_all(self, provider):
"""
@ -152,14 +312,6 @@ class HadoopService(object):
print """export HADOOP_CLOUD_PROXY_PID=%s;
echo Proxy pid %s;""" % (process.pid, process.pid)
def push(self, ssh_options, file):
master = self._get_master()
if not master:
sys.exit(1)
subprocess.call('scp %s -r %s root@%s:' % (xstr(ssh_options),
file, master.public_ip),
shell=True)
def push(self, ssh_options, file):
master = self._get_master()
if not master:
@ -176,51 +328,6 @@ echo Proxy pid %s;""" % (process.pid, process.pid)
master.public_ip,
" ".join(args)), shell=True)
def terminate_cluster(self, force=False):
self.cluster.print_status()
if not force and not self._prompt("Terminate all instances?"):
print "Not terminating cluster."
else:
print "Terminating cluster"
self.cluster.terminate()
def delete_cluster(self):
self.cluster.delete()
def create_formatted_snapshot(self, size, availability_zone,
image_id, key_name, ssh_options):
Ec2Storage.create_formatted_snapshot(self.cluster, size,
availability_zone,
image_id,
key_name,
ssh_options)
def list_storage(self):
storage = self.cluster.get_storage()
storage.print_status()
def create_storage(self, role, number_of_instances,
availability_zone, spec_file):
storage = self.cluster.get_storage()
storage.create(role, number_of_instances, availability_zone, spec_file)
storage.print_status()
def attach_storage(self, role):
storage = self.cluster.get_storage()
storage.attach(role, self.cluster.get_instances_in_role(role, 'running'))
storage.print_status()
def delete_storage(self, force=False):
storage = self.cluster.get_storage()
storage.print_status()
if not force and not self._prompt("Delete all storage volumes? THIS WILL \
PERMANENTLY DELETE ALL DATA"):
print "Not deleting storage volumes."
else:
print "Deleting storage"
for role in storage.get_roles():
storage.delete(role)
def update_slaves_file(self, config_dir, ssh_options, private_key):
instances = self.cluster.check_running(NAMENODE, 1)
if not instances:
@ -241,16 +348,6 @@ echo Proxy pid %s;""" % (process.pid, process.pid)
subprocess.call('scp %s -r %s root@%s:/root/.ssh/id_rsa' % \
(ssh_options, private_key, slave.public_ip), shell=True)
def _prompt(self, prompt):
""" Returns true if user responds "yes" to prompt. """
return raw_input("%s [yes or no]: " % prompt).lower() == "yes"
def _get_default_user_data_file_template(self):
data_path = os.path.join(os.path.dirname(__file__), 'data')
return os.path.join(data_path, 'hadoop-%s-init-remote.sh' %
self.cluster.get_provider_code())
def _get_master(self):
# For split namenode/jobtracker, designate the namenode as the master
return self._get_namenode()
@ -288,42 +385,6 @@ echo Proxy pid %s;""" % (process.pid, process.pid)
"""Replace characters in role name with ones allowed in bash variable names"""
return role.replace('+', '_').upper()
def _launch_instances(self, instance_template):
it = instance_template
user_data_file_template = it.user_data_file_template
if it.user_data_file_template == None:
user_data_file_template = self._get_default_user_data_file_template()
ebs_mappings = ''
storage = self.cluster.get_storage()
for role in it.roles:
if storage.has_any_storage((role,)):
ebs_mappings = storage.get_mappings_string_for_role(role)
replacements = { "%ENV%": build_env_string(it.env_strings, {
"ROLES": ",".join(it.roles),
"USER_PACKAGES": it.user_packages,
"AUTO_SHUTDOWN": it.auto_shutdown,
"EBS_MAPPINGS": ebs_mappings,
}) }
instance_user_data = InstanceUserData(user_data_file_template, replacements)
instance_ids = self.cluster.launch_instances(it.roles, it.number, it.image_id,
it.size_id,
instance_user_data,
key_name=it.key_name,
public_key=it.public_key,
placement=it.placement,
security_groups=it.security_groups)
print "Waiting for %s instances in role %s to start" % \
(it.number, ",".join(it.roles))
try:
self.cluster.wait_for_instances(instance_ids)
print "%s instances started" % ",".join(it.roles)
except TimeoutException:
print "Timeout while waiting for %s instance to start." % ",".join(it.roles)
return
print
self.cluster.print_status(it.roles[0])
return self.cluster.get_instances_in_role(it.roles[0], "running")
def _authorize_client_ports(self, client_cidrs=[]):
if not client_cidrs:
logger.debug("No client CIDRs specified, using local address.")
@ -463,4 +524,111 @@ echo Proxy pid %s;""" % (process.pid, process.pid)
time.sleep(10)
for role in roles:
storage.attach(role, self.cluster.get_instances_in_role(role, 'running'))
storage.print_status(roles)
storage.print_status(roles)
class ZooKeeperService(Service):
"""
A ZooKeeper service.
"""
ZOOKEEPER_ROLE = "zk"
def __init__(self, cluster):
super(ZooKeeperService, self).__init__(cluster)
def get_service_code(self):
return "zookeeper"
def launch_cluster(self, instance_templates, config_dir, client_cidr):
self._launch_cluster_instances(instance_templates)
self._authorize_client_ports(client_cidr)
self._update_cluster_membership(instance_templates[0].public_key)
def _launch_cluster_instances(self, instance_templates):
for instance_template in instance_templates:
instances = self._launch_instances(instance_template)
def _authorize_client_ports(self, client_cidrs=[]):
if not client_cidrs:
logger.debug("No client CIDRs specified, using local address.")
client_ip = url_get('http://checkip.amazonaws.com/').strip()
client_cidrs = ("%s/32" % client_ip,)
logger.debug("Client CIDRs: %s", client_cidrs)
for client_cidr in client_cidrs:
self.cluster.authorize_role(self.ZOOKEEPER_ROLE, 2181, 2181, client_cidr)
def _update_cluster_membership(self, public_key):
time.sleep(30) # wait for SSH daemon to start
ssh_options = '-o StrictHostKeyChecking=no'
private_key = public_key[:-4] # TODO: pass in private key explicitly
instances = self.cluster.get_instances_in_role(self.ZOOKEEPER_ROLE,
'running')
config_file = 'zoo.cfg'
with open(config_file, 'w') as f:
f.write("""# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# The directory where the snapshot is stored.
dataDir=/var/log/zookeeper/txlog
# The port at which the clients will connect
clientPort=2181
# The servers in the ensemble
""")
counter = 1
for i in instances:
f.write("server.%s=%s:2888:3888\n" % (counter, i.private_ip))
counter += 1
# copy to each node in the cluster
myid_file = 'myid'
counter = 1
for i in instances:
self._call('scp -i %s %s %s root@%s:/etc/zookeeper/conf/zoo.cfg' \
% (private_key, ssh_options, config_file, i.public_ip))
with open(myid_file, 'w') as f:
f.write(str(counter) + "\n")
self._call('scp -i %s %s %s root@%s:/var/log/zookeeper/txlog/myid' \
% (private_key, ssh_options, myid_file, i.public_ip))
counter += 1
os.remove(config_file)
os.remove(myid_file)
# start the zookeeper servers
for i in instances:
self._call('ssh -i %s %s root@%s nohup /etc/rc.local &' \
% (private_key, ssh_options, i.public_ip))
hosts_string = ",".join(["%s:2181" % i.public_ip for i in instances])
print "ZooKeeper cluster: %s" % hosts_string
SERVICE_PROVIDER_MAP = {
"hadoop": {
# "provider_code": ('hadoop.cloud.providers.provider_code', 'ProviderHadoopService')
},
"zookeeper": {
# "provider_code": ('hadoop.cloud.providers.provider_code', 'ProviderZooKeeperService')
},
}
DEFAULT_SERVICE_PROVIDER_MAP = {
"hadoop": HadoopService,
"zookeeper": ZooKeeperService
}
def get_service(service, provider):
"""
Retrieve the Service class for a service and provider.
"""
try:
mod_name, service_classname = SERVICE_PROVIDER_MAP[service][provider]
_mod = __import__(mod_name, globals(), locals(), [service_classname])
return getattr(_mod, service_classname)
except KeyError:
return DEFAULT_SERVICE_PROVIDER_MAP[service]