HADOOP-6464. Write a Rackspace cloud provider.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@897023 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas White 2010-01-07 21:43:38 +00:00
parent 1ca1bfb2e5
commit 8942119b04
12 changed files with 1090 additions and 25 deletions

View File

@ -38,6 +38,8 @@ Trunk (unreleased changes)
HADOOP-6408. Add a /conf servlet to dump running configuration.
(Todd Lipcon via tomwhite)
HADOOP-6464. Write a Rackspace cloud provider. (tomwhite)
IMPROVEMENTS
HADOOP-6283. Improve the exception messages thrown by

View File

@ -1,8 +1,9 @@
Hadoop Cloud Scripts
====================
These scripts allow you to run Hadoop on cloud providers. Currently only Amazon
EC2 is supported, but more providers are expected to be added over time.
These scripts allow you to run Hadoop on cloud providers. These instructions
assume you are running on Amazon EC2, the differences for other providers are
noted at the end of this document.
Getting Started
===============
@ -337,3 +338,160 @@ private_key=PATH_TO_PRIVATE_KEY
Then to launch a three-node ZooKeeper ensemble, run:
% ./hadoop-ec2 launch-cluster my-zookeeper-cluster 3 zk
PROVIDER-SPECIFIC DETAILS
=========================
Rackspace
=========
Running on Rackspace is very similar to running on EC2, with a few minor
differences noted here.
Security Warning
================
Currently, Hadoop clusters on Rackspace are insecure since they don't run behind
a firewall.
Creating an image
=================
Rackspace doesn't support shared images, so you will need to build your own base
image to get started. See "Instructions for creating an image" at the end of
this document for details.
Installation
============
To run on rackspace you need to install libcloud by checking out the latest
source from Apache:
git clone git://git.apache.org/libcloud.git
cd libcloud; python setup.py install
Set up your Rackspace credentials by exporting the following environment
variables:
* RACKSPACE_KEY - Your Rackspace user name
* RACKSPACE_SECRET - Your Rackspace API key
Configuration
=============
The cloud_provider parameter must be set to specify Rackspace as the provider.
Here is a typical configuration:
[my-rackspace-cluster]
cloud_provider=rackspace
image_id=200152
instance_type=4
public_key=/path/to/public/key/file
private_key=/path/to/private/key/file
ssh_options=-i %(private_key)s -o StrictHostKeyChecking=no
It's a good idea to create a dedicated key using a command similar to:
ssh-keygen -f id_rsa_rackspace -P ''
Launching a cluster
===================
Use the "hadoop-cloud" command instead of "hadoop-ec2".
After launching a cluster you need to manually add a hostname mapping for the
master node to your client's /etc/hosts to get it to work. This is because DNS
isn't set up for the cluster nodes so your client won't resolve their addresses.
You can do this with
hadoop-cloud list my-rackspace-cluster | grep 'nn,snn,jt' \
| awk '{print $4 " " $3 }' | sudo tee -a /etc/hosts
Instructions for creating an image
==================================
First set your Rackspace credentials:
export RACKSPACE_KEY=<Your Rackspace user name>
export RACKSPACE_SECRET=<Your Rackspace API key>
Now create an authentication token for the session, and retrieve the server
management URL to perform operations against.
# Final SED is to remove trailing ^M
AUTH_TOKEN=`curl -D - -H X-Auth-User:$RACKSPACE_KEY \
-H X-Auth-Key:$RACKSPACE_SECRET https://auth.api.rackspacecloud.com/v1.0 \
| grep 'X-Auth-Token:' | awk '{print $2}' | sed 's/.$//'`
SERVER_MANAGEMENT_URL=`curl -D - -H X-Auth-User:$RACKSPACE_KEY \
-H X-Auth-Key:$RACKSPACE_SECRET https://auth.api.rackspacecloud.com/v1.0 \
| grep 'X-Server-Management-Url:' | awk '{print $2}' | sed 's/.$//'`
echo $AUTH_TOKEN
echo $SERVER_MANAGEMENT_URL
You can get a list of images with the following
curl -H X-Auth-Token:$AUTH_TOKEN $SERVER_MANAGEMENT_URL/images
Here's the same query, but with pretty-printed XML output:
curl -H X-Auth-Token:$AUTH_TOKEN $SERVER_MANAGEMENT_URL/images.xml | xmllint --format -
There are similar queries for flavors and running instances:
curl -H X-Auth-Token:$AUTH_TOKEN $SERVER_MANAGEMENT_URL/flavors.xml | xmllint --format -
curl -H X-Auth-Token:$AUTH_TOKEN $SERVER_MANAGEMENT_URL/servers.xml | xmllint --format -
The following command will create a new server. In this case it will create a
2GB Ubuntu 8.10 instance, as determined by the imageId and flavorId attributes.
The name of the instance is set to something meaningful too.
curl -v -X POST -H X-Auth-Token:$AUTH_TOKEN -H 'Content-type: text/xml' -d @- $SERVER_MANAGEMENT_URL/servers << EOF
<server xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" name="apache-hadoop-ubuntu-8.10-base" imageId="11" flavorId="4">
<metadata/>
</server>
EOF
Make a note of the new server's ID, public IP address and admin password as you
will need these later.
You can check the status of the server with
curl -H X-Auth-Token:$AUTH_TOKEN $SERVER_MANAGEMENT_URL/servers/$SERVER_ID.xml | xmllint --format -
When it has started (status "ACTIVE"), copy the setup script over:
scp tools/rackspace/remote-setup.sh root@$SERVER:remote-setup.sh
Log in to and run the setup script (you will need to manually accept the
Sun Java license):
sh remote-setup.sh
Once the script has completed, log out and create an image of the running
instance (giving it a memorable name):
curl -v -X POST -H X-Auth-Token:$AUTH_TOKEN -H 'Content-type: text/xml' -d @- $SERVER_MANAGEMENT_URL/images << EOF
<image xmlns="http://docs.rackspacecloud.com/servers/api/v1.0" name="Apache Hadoop Ubuntu 8.10" serverId="$SERVER_ID" />
EOF
Keep a note of the image ID as this is what you will use to launch fresh
instances from.
You can check the status of the image with
curl -H X-Auth-Token:$AUTH_TOKEN $SERVER_MANAGEMENT_URL/images/$IMAGE_ID.xml | xmllint --format -
When it's "ACTIVE" is is ready for use. It's important to realize that you have
to keep the server from which you generated the image running for as long as the
image is in use.
However, if you want to clean up an old instance run:
curl -X DELETE -H X-Auth-Token:$AUTH_TOKEN $SERVER_MANAGEMENT_URL/servers/$SERVER_ID
Similarly, you can delete old images:
curl -X DELETE -H X-Auth-Token:$AUTH_TOKEN $SERVER_MANAGEMENT_URL/images/$IMAGE_ID

View File

@ -32,6 +32,7 @@ WORKSPACE=${WORKSPACE:-`pwd`}
CONFIG_DIR=${CONFIG_DIR:-$WORKSPACE/.hadoop-cloud}
CLUSTER=${CLUSTER:-hadoop-cloud-$USER-test-cluster}
IMAGE_ID=${IMAGE_ID:-ami-6159bf08} # default to Fedora 32-bit AMI
INSTANCE_TYPE=${INSTANCE_TYPE:-m1.small}
AVAILABILITY_ZONE=${AVAILABILITY_ZONE:-us-east-1c}
KEY_NAME=${KEY_NAME:-$USER}
AUTO_SHUTDOWN=${AUTO_SHUTDOWN:-15}
@ -39,11 +40,12 @@ LOCAL_HADOOP_VERSION=${LOCAL_HADOOP_VERSION:-0.20.1}
HADOOP_HOME=${HADOOP_HOME:-$WORKSPACE/hadoop-$LOCAL_HADOOP_VERSION}
HADOOP_CLOUD_HOME=${HADOOP_CLOUD_HOME:-$bin/../py}
HADOOP_CLOUD_PROVIDER=${HADOOP_CLOUD_PROVIDER:-ec2}
SSH_OPTIONS=${SSH_OPTIONS:-"-i ~/.$HADOOP_CLOUD_PROVIDER/id_rsa-$KEY_NAME \
-o StrictHostKeyChecking=no"}
LAUNCH_ARGS=${LAUNCH_ARGS:-1} # Try LAUNCH_ARGS="1 nn,snn 1 jt 1 dn,tt"
PUBLIC_KEY=${PUBLIC_KEY:-~/.$HADOOP_CLOUD_PROVIDER/id_rsa-$KEY_NAME.pub}
PRIVATE_KEY=${PRIVATE_KEY:-~/.$HADOOP_CLOUD_PROVIDER/id_rsa-$KEY_NAME}
SSH_OPTIONS=${SSH_OPTIONS:-"-i $PRIVATE_KEY -o StrictHostKeyChecking=no"}
LAUNCH_ARGS=${LAUNCH_ARGS:-"1 nn,snn,jt 1 dn,tt"}
HADOOP_CLOUD_SCRIPT=$HADOOP_CLOUD_HOME/hadoop-$HADOOP_CLOUD_PROVIDER
HADOOP_CLOUD_SCRIPT=$HADOOP_CLOUD_HOME/hadoop-cloud
export HADOOP_CONF_DIR=$CONFIG_DIR/$CLUSTER
# Install Hadoop locally
@ -55,19 +57,44 @@ $LOCAL_HADOOP_VERSION/hadoop-$LOCAL_HADOOP_VERSION.tar.gz
fi
# Launch a cluster
$HADOOP_CLOUD_SCRIPT launch-cluster --config-dir=$CONFIG_DIR \
--image-id=$IMAGE_ID --key-name=$KEY_NAME --auto-shutdown=$AUTO_SHUTDOWN \
--availability-zone=$AVAILABILITY_ZONE $CLIENT_CIDRS $ENVS $CLUSTER \
$LAUNCH_ARGS
if [ $HADOOP_CLOUD_PROVIDER == 'ec2' ]; then
$HADOOP_CLOUD_SCRIPT launch-cluster \
--config-dir=$CONFIG_DIR \
--image-id=$IMAGE_ID \
--instance-type=$INSTANCE_TYPE \
--key-name=$KEY_NAME \
--auto-shutdown=$AUTO_SHUTDOWN \
--availability-zone=$AVAILABILITY_ZONE \
$CLIENT_CIDRS $ENVS $CLUSTER $LAUNCH_ARGS
else
$HADOOP_CLOUD_SCRIPT launch-cluster --cloud-provider=$HADOOP_CLOUD_PROVIDER \
--config-dir=$CONFIG_DIR \
--image-id=$IMAGE_ID \
--instance-type=$INSTANCE_TYPE \
--public-key=$PUBLIC_KEY \
--private-key=$PRIVATE_KEY \
--auto-shutdown=$AUTO_SHUTDOWN \
$CLIENT_CIDRS $ENVS $CLUSTER $LAUNCH_ARGS
fi
# List clusters
$HADOOP_CLOUD_SCRIPT list --config-dir=$CONFIG_DIR
$HADOOP_CLOUD_SCRIPT list --config-dir=$CONFIG_DIR $CLUSTER
$HADOOP_CLOUD_SCRIPT list --cloud-provider=$HADOOP_CLOUD_PROVIDER \
--config-dir=$CONFIG_DIR
$HADOOP_CLOUD_SCRIPT list --cloud-provider=$HADOOP_CLOUD_PROVIDER \
--config-dir=$CONFIG_DIR $CLUSTER
# Run a proxy and save its pid in HADOOP_CLOUD_PROXY_PID
eval `$HADOOP_CLOUD_SCRIPT proxy --config-dir=$CONFIG_DIR \
eval `$HADOOP_CLOUD_SCRIPT proxy --cloud-provider=$HADOOP_CLOUD_PROVIDER \
--config-dir=$CONFIG_DIR \
--ssh-options="$SSH_OPTIONS" $CLUSTER`
if [ $HADOOP_CLOUD_PROVIDER == 'rackspace' ]; then
# Need to update /etc/hosts (interactively)
$HADOOP_CLOUD_SCRIPT list --cloud-provider=$HADOOP_CLOUD_PROVIDER \
--config-dir=$CONFIG_DIR $CLUSTER | grep 'nn,snn,jt' \
| awk '{print $4 " " $3 }' | sudo tee -a /etc/hosts
fi
# Run a job and check it works
$HADOOP_HOME/bin/hadoop fs -mkdir input
$HADOOP_HOME/bin/hadoop fs -put $HADOOP_HOME/LICENSE.txt input
@ -78,6 +105,8 @@ $HADOOP_HOME/bin/hadoop fs -cat 'output/part-00000' | grep Apache
# Shutdown the cluster
kill $HADOOP_CLOUD_PROXY_PID
$HADOOP_CLOUD_SCRIPT terminate-cluster --config-dir=$CONFIG_DIR --force $CLUSTER
$HADOOP_CLOUD_SCRIPT terminate-cluster --cloud-provider=$HADOOP_CLOUD_PROVIDER \
--config-dir=$CONFIG_DIR --force $CLUSTER
sleep 5 # wait for termination to take effect
$HADOOP_CLOUD_SCRIPT delete-cluster --config-dir=$CONFIG_DIR $CLUSTER
$HADOOP_CLOUD_SCRIPT delete-cluster --cloud-provider=$HADOOP_CLOUD_PROVIDER \
--config-dir=$CONFIG_DIR $CLUSTER

View File

@ -0,0 +1,21 @@
#!/usr/bin/env python2.5
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from hadoop.cloud.cli import main
if __name__ == "__main__":
main()

View File

@ -88,6 +88,9 @@ firewall to the master node. (May be specified multiple times.)"),
should be run. (Amazon EC2 only.) (May be specified multiple times.)"),
make_option("--public-key", metavar="FILE",
help="The public key to authorize on launching instances. (Non-EC2 \
providers only.)"),
make_option("--private-key", metavar="FILE",
help="The private key to use when connecting to instances. (Non-EC2 \
providers only.)"),
]
@ -289,7 +292,8 @@ def main():
template = InstanceTemplate((NAMENODE, SECONDARY_NAMENODE, JOBTRACKER), 1,
get_image_id(service.cluster, opt),
opt.get('instance_type'), opt.get('key_name'),
opt.get('public_key'), opt.get('user_data_file'),
opt.get('public_key'), opt.get('private_key'),
opt.get('user_data_file'),
opt.get('availability_zone'), opt.get('user_packages'),
opt.get('auto_shutdown'), opt.get('env'),
opt.get('security_group'))
@ -303,7 +307,8 @@ def main():
template = InstanceTemplate((DATANODE, TASKTRACKER), number_of_slaves,
get_image_id(service.cluster, opt),
opt.get('instance_type'), opt.get('key_name'),
opt.get('public_key'), opt.get('user_data_file'),
opt.get('public_key'), opt.get('private_key'),
opt.get('user_data_file'),
opt.get('availability_zone'), opt.get('user_packages'),
opt.get('auto_shutdown'), opt.get('env'),
opt.get('security_group'))
@ -324,14 +329,16 @@ def main():
InstanceTemplate((NAMENODE, SECONDARY_NAMENODE, JOBTRACKER), 1,
get_image_id(service.cluster, opt),
opt.get('instance_type'), opt.get('key_name'),
opt.get('public_key'), opt.get('user_data_file'),
opt.get('public_key'), opt.get('private_key'),
opt.get('user_data_file'),
opt.get('availability_zone'), opt.get('user_packages'),
opt.get('auto_shutdown'), opt.get('env'),
opt.get('security_group')),
InstanceTemplate((DATANODE, TASKTRACKER), number_of_slaves,
get_image_id(service.cluster, opt),
opt.get('instance_type'), opt.get('key_name'),
opt.get('public_key'), opt.get('user_data_file'),
opt.get('public_key'), opt.get('private_key'),
opt.get('user_data_file'),
opt.get('availability_zone'), opt.get('user_packages'),
opt.get('auto_shutdown'), opt.get('env'),
opt.get('security_group')),
@ -346,7 +353,8 @@ def main():
instance_templates.append(
InstanceTemplate(roles, number, get_image_id(service.cluster, opt),
opt.get('instance_type'), opt.get('key_name'),
opt.get('public_key'), opt.get('user_data_file'),
opt.get('public_key'), opt.get('private_key'),
opt.get('user_data_file'),
opt.get('availability_zone'),
opt.get('user_packages'),
opt.get('auto_shutdown'), opt.get('env'),

View File

@ -28,6 +28,7 @@ from hadoop.cloud.storage import Storage
CLUSTER_PROVIDER_MAP = {
"dummy": ('hadoop.cloud.providers.dummy', 'DummyCluster'),
"ec2": ('hadoop.cloud.providers.ec2', 'Ec2Cluster'),
"rackspace": ('hadoop.cloud.providers.rackspace', 'RackspaceCluster'),
}
def get_cluster(provider):

View File

@ -0,0 +1,459 @@
#!/bin/bash -x
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Script that is run on each instance on boot.
################################################################################
################################################################################
# Initialize variables
################################################################################
SELF_HOST=`/sbin/ifconfig eth0 | grep 'inet addr:' | cut -d: -f2 | awk '{ print $1}'`
HADOOP_VERSION=${HADOOP_VERSION:-0.20.1}
HADOOP_HOME=/usr/local/hadoop-$HADOOP_VERSION
HADOOP_CONF_DIR=$HADOOP_HOME/conf
for role in $(echo "$ROLES" | tr "," "\n"); do
case $role in
nn)
NN_HOST=$SELF_HOST
;;
jt)
JT_HOST=$SELF_HOST
;;
esac
done
function register_auto_shutdown() {
if [ ! -z "$AUTO_SHUTDOWN" ]; then
shutdown -h +$AUTO_SHUTDOWN >/dev/null &
fi
}
function update_repo() {
if which dpkg &> /dev/null; then
sudo apt-get update
elif which rpm &> /dev/null; then
yum update -y yum
fi
}
# Install a list of packages on debian or redhat as appropriate
function install_packages() {
if which dpkg &> /dev/null; then
apt-get update
apt-get -y install $@
elif which rpm &> /dev/null; then
yum install -y $@
else
echo "No package manager found."
fi
}
# Install any user packages specified in the USER_PACKAGES environment variable
function install_user_packages() {
if [ ! -z "$USER_PACKAGES" ]; then
install_packages $USER_PACKAGES
fi
}
function install_hadoop() {
useradd hadoop
hadoop_tar_url=http://s3.amazonaws.com/hadoop-releases/core/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz
hadoop_tar_file=`basename $hadoop_tar_url`
hadoop_tar_md5_file=`basename $hadoop_tar_url.md5`
curl="curl --retry 3 --silent --show-error --fail"
for i in `seq 1 3`;
do
$curl -O $hadoop_tar_url
$curl -O $hadoop_tar_url.md5
if md5sum -c $hadoop_tar_md5_file; then
break;
else
rm -f $hadoop_tar_file $hadoop_tar_md5_file
fi
done
if [ ! -e $hadoop_tar_file ]; then
echo "Failed to download $hadoop_tar_url. Aborting."
exit 1
fi
tar zxf $hadoop_tar_file -C /usr/local
rm -f $hadoop_tar_file $hadoop_tar_md5_file
echo "export HADOOP_HOME=$HADOOP_HOME" >> ~root/.bashrc
echo 'export PATH=$JAVA_HOME/bin:$HADOOP_HOME/bin:$PATH' >> ~root/.bashrc
}
function prep_disk() {
mount=$1
device=$2
automount=${3:-false}
echo "warning: ERASING CONTENTS OF $device"
mkfs.xfs -f $device
if [ ! -e $mount ]; then
mkdir $mount
fi
mount -o defaults,noatime $device $mount
if $automount ; then
echo "$device $mount xfs defaults,noatime 0 0" >> /etc/fstab
fi
}
function wait_for_mount {
mount=$1
device=$2
mkdir $mount
i=1
echo "Attempting to mount $device"
while true ; do
sleep 10
echo -n "$i "
i=$[$i+1]
mount -o defaults,noatime $device $mount || continue
echo " Mounted."
break;
done
}
function make_hadoop_dirs {
for mount in "$@"; do
if [ ! -e $mount/hadoop ]; then
mkdir -p $mount/hadoop
chown hadoop:hadoop $mount/hadoop
fi
done
}
# Configure Hadoop by setting up disks and site file
function configure_hadoop() {
MOUNT=/data
FIRST_MOUNT=$MOUNT
DFS_NAME_DIR=$MOUNT/hadoop/hdfs/name
FS_CHECKPOINT_DIR=$MOUNT/hadoop/hdfs/secondary
DFS_DATA_DIR=$MOUNT/hadoop/hdfs/data
MAPRED_LOCAL_DIR=$MOUNT/hadoop/mapred/local
MAX_MAP_TASKS=2
MAX_REDUCE_TASKS=1
CHILD_OPTS=-Xmx550m
CHILD_ULIMIT=1126400
TMP_DIR=$MOUNT/tmp/hadoop-\${user.name}
mkdir -p $MOUNT/hadoop
chown hadoop:hadoop $MOUNT/hadoop
mkdir $MOUNT/tmp
chmod a+rwxt $MOUNT/tmp
mkdir /etc/hadoop
ln -s $HADOOP_CONF_DIR /etc/hadoop/conf
##############################################################################
# Modify this section to customize your Hadoop cluster.
##############################################################################
cat > $HADOOP_CONF_DIR/hadoop-site.xml <<EOF
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.block.size</name>
<value>134217728</value>
<final>true</final>
</property>
<property>
<name>dfs.data.dir</name>
<value>$DFS_DATA_DIR</value>
<final>true</final>
</property>
<property>
<name>dfs.datanode.du.reserved</name>
<value>1073741824</value>
<final>true</final>
</property>
<property>
<name>dfs.datanode.handler.count</name>
<value>3</value>
<final>true</final>
</property>
<!--property>
<name>dfs.hosts</name>
<value>$HADOOP_CONF_DIR/dfs.hosts</value>
<final>true</final>
</property-->
<!--property>
<name>dfs.hosts.exclude</name>
<value>$HADOOP_CONF_DIR/dfs.hosts.exclude</value>
<final>true</final>
</property-->
<property>
<name>dfs.name.dir</name>
<value>$DFS_NAME_DIR</value>
<final>true</final>
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>5</value>
<final>true</final>
</property>
<property>
<name>dfs.permissions</name>
<value>true</value>
<final>true</final>
</property>
<property>
<name>dfs.replication</name>
<value>$DFS_REPLICATION</value>
</property>
<property>
<name>fs.checkpoint.dir</name>
<value>$FS_CHECKPOINT_DIR</value>
<final>true</final>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://$NN_HOST:8020/</value>
</property>
<property>
<name>fs.trash.interval</name>
<value>1440</value>
<final>true</final>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/data/tmp/hadoop-\${user.name}</value>
<final>true</final>
</property>
<property>
<name>io.file.buffer.size</name>
<value>65536</value>
</property>
<property>
<name>mapred.child.java.opts</name>
<value>$CHILD_OPTS</value>
</property>
<property>
<name>mapred.child.ulimit</name>
<value>$CHILD_ULIMIT</value>
<final>true</final>
</property>
<property>
<name>mapred.job.tracker</name>
<value>$JT_HOST:8021</value>
</property>
<property>
<name>mapred.job.tracker.handler.count</name>
<value>5</value>
<final>true</final>
</property>
<property>
<name>mapred.local.dir</name>
<value>$MAPRED_LOCAL_DIR</value>
<final>true</final>
</property>
<property>
<name>mapred.map.tasks.speculative.execution</name>
<value>true</value>
</property>
<property>
<name>mapred.reduce.parallel.copies</name>
<value>10</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>10</value>
</property>
<property>
<name>mapred.reduce.tasks.speculative.execution</name>
<value>false</value>
</property>
<property>
<name>mapred.submit.replication</name>
<value>10</value>
</property>
<property>
<name>mapred.system.dir</name>
<value>/hadoop/system/mapred</value>
</property>
<property>
<name>mapred.tasktracker.map.tasks.maximum</name>
<value>$MAX_MAP_TASKS</value>
<final>true</final>
</property>
<property>
<name>mapred.tasktracker.reduce.tasks.maximum</name>
<value>$MAX_REDUCE_TASKS</value>
<final>true</final>
</property>
<property>
<name>tasktracker.http.threads</name>
<value>46</value>
<final>true</final>
</property>
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
<property>
<name>mapred.output.compression.type</name>
<value>BLOCK</value>
</property>
<property>
<name>hadoop.rpc.socket.factory.class.default</name>
<value>org.apache.hadoop.net.StandardSocketFactory</value>
<final>true</final>
</property>
<property>
<name>hadoop.rpc.socket.factory.class.ClientProtocol</name>
<value></value>
<final>true</final>
</property>
<property>
<name>hadoop.rpc.socket.factory.class.JobSubmissionProtocol</name>
<value></value>
<final>true</final>
</property>
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec</value>
</property>
</configuration>
EOF
# Keep PID files in a non-temporary directory
sed -i -e "s|# export HADOOP_PID_DIR=.*|export HADOOP_PID_DIR=/var/run/hadoop|" \
$HADOOP_CONF_DIR/hadoop-env.sh
mkdir -p /var/run/hadoop
chown -R hadoop:hadoop /var/run/hadoop
# Set SSH options within the cluster
sed -i -e 's|# export HADOOP_SSH_OPTS=.*|export HADOOP_SSH_OPTS="-o StrictHostKeyChecking=no"|' \
$HADOOP_CONF_DIR/hadoop-env.sh
# Disable IPv6
sed -i -e 's|# export HADOOP_OPTS=.*|export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true"|' \
$HADOOP_CONF_DIR/hadoop-env.sh
# Hadoop logs should be on the /mnt partition
sed -i -e 's|# export HADOOP_LOG_DIR=.*|export HADOOP_LOG_DIR=/var/log/hadoop/logs|' \
$HADOOP_CONF_DIR/hadoop-env.sh
rm -rf /var/log/hadoop
mkdir /data/hadoop/logs
chown hadoop:hadoop /data/hadoop/logs
ln -s /data/hadoop/logs /var/log/hadoop
chown -R hadoop:hadoop /var/log/hadoop
}
# Sets up small website on cluster.
function setup_web() {
if which dpkg &> /dev/null; then
apt-get -y install thttpd
WWW_BASE=/var/www
elif which rpm &> /dev/null; then
yum install -y thttpd
chkconfig --add thttpd
WWW_BASE=/var/www/thttpd/html
fi
cat > $WWW_BASE/index.html << END
<html>
<head>
<title>Hadoop Cloud Cluster</title>
</head>
<body>
<h1>Hadoop Cloud Cluster</h1>
To browse the cluster you need to have a proxy configured.
Start the proxy with <tt>hadoop-cloud proxy &lt;cluster_name&gt;</tt>,
and point your browser to
<a href="http://apache-hadoop-ec2.s3.amazonaws.com/proxy.pac">this Proxy
Auto-Configuration (PAC)</a> file. To manage multiple proxy configurations,
you may wish to use
<a href="https://addons.mozilla.org/en-US/firefox/addon/2464">FoxyProxy</a>.
<ul>
<li><a href="http://$NN_HOST:50070/">NameNode</a>
<li><a href="http://$JT_HOST:50030/">JobTracker</a>
</ul>
</body>
</html>
END
service thttpd start
}
function start_namenode() {
if which dpkg &> /dev/null; then
AS_HADOOP="su -s /bin/bash - hadoop -c"
elif which rpm &> /dev/null; then
AS_HADOOP="/sbin/runuser -s /bin/bash - hadoop -c"
fi
# Format HDFS
[ ! -e $FIRST_MOUNT/hadoop/hdfs ] && $AS_HADOOP "$HADOOP_HOME/bin/hadoop namenode -format"
$AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start namenode"
$AS_HADOOP "$HADOOP_HOME/bin/hadoop dfsadmin -safemode wait"
$AS_HADOOP "$HADOOP_HOME/bin/hadoop fs -mkdir /user"
# The following is questionable, as it allows a user to delete another user
# It's needed to allow users to create their own user directories
$AS_HADOOP "$HADOOP_HOME/bin/hadoop fs -chmod +w /user"
}
function start_daemon() {
if which dpkg &> /dev/null; then
AS_HADOOP="su -s /bin/bash - hadoop -c"
elif which rpm &> /dev/null; then
AS_HADOOP="/sbin/runuser -s /bin/bash - hadoop -c"
fi
$AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start $1"
}
update_repo
register_auto_shutdown
install_user_packages
install_hadoop
configure_hadoop
for role in $(echo "$ROLES" | tr "," "\n"); do
case $role in
nn)
setup_web
start_namenode
;;
snn)
start_daemon secondarynamenode
;;
jt)
start_daemon jobtracker
;;
dn)
start_daemon datanode
;;
tt)
start_daemon tasktracker
;;
esac
done

View File

@ -0,0 +1,22 @@
#!/bin/bash -ex
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Run a script downloaded at boot time to avoid Rackspace's 10K limitation.
wget -qO/usr/bin/runurl run.alestic.com/runurl
chmod 755 /usr/bin/runurl
%ENV% runurl http://hadoop-dev-test.s3.amazonaws.com/boot-rackspace.sh

View File

@ -0,0 +1,239 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import base64
import os
import subprocess
import sys
import time
import uuid
from hadoop.cloud.cluster import Cluster
from hadoop.cloud.cluster import Instance
from hadoop.cloud.cluster import TimeoutException
from hadoop.cloud.service import HadoopService
from hadoop.cloud.service import TASKTRACKER
from libcloud.drivers.rackspace import RackspaceNodeDriver
from libcloud.base import Node
from libcloud.base import NodeImage
RACKSPACE_KEY = os.environ['RACKSPACE_KEY']
RACKSPACE_SECRET = os.environ['RACKSPACE_SECRET']
STATE_MAP = { 'running': 'ACTIVE' }
STATE_MAP_REVERSED = dict((v, k) for k, v in STATE_MAP.iteritems())
USER_DATA_FILENAME = "/etc/init.d/rackspace-init.sh"
class RackspaceCluster(Cluster):
"""
A cluster of instances running on Rackspace Cloud Servers. A cluster has a
unique name, which is stored under the "cluster" metadata key of each server.
Every instance in the cluster has one or more roles, stored as a
comma-separated string under the "roles" metadata key. For example, an instance
with roles "foo" and "bar" has a "foo,bar" "roles" key.
At boot time two files are injected into an instance's filesystem: the user
data file (which is used as a boot script), and the user's public key.
"""
@staticmethod
def get_clusters_with_role(role, state="running", driver=None):
driver = driver or RackspaceNodeDriver(RACKSPACE_KEY, RACKSPACE_SECRET)
all_nodes = RackspaceCluster._list_nodes(driver)
clusters = set()
for node in all_nodes:
try:
if node.extra['metadata'].has_key('cluster') and \
role in node.extra['metadata']['roles'].split(','):
if node.state == STATE_MAP[state]:
clusters.add(node.extra['metadata']['cluster'])
except KeyError:
pass
return clusters
@staticmethod
def _list_nodes(driver, retries=5):
attempts = 0
while True:
try:
return driver.list_nodes()
except IOError:
attempts = attempts + 1
if attempts > retries:
raise
time.sleep(5)
def __init__(self, name, config_dir, driver=None):
super(RackspaceCluster, self).__init__(name, config_dir)
self.driver = driver or RackspaceNodeDriver(RACKSPACE_KEY, RACKSPACE_SECRET)
def get_provider_code(self):
return "rackspace"
def _get_nodes(self, state_filter=None):
all_nodes = RackspaceCluster._list_nodes(self.driver)
nodes = []
for node in all_nodes:
try:
if node.extra['metadata']['cluster'] == self.name:
if state_filter == None or node.state == STATE_MAP[state_filter]:
nodes.append(node)
except KeyError:
pass
return nodes
def _to_instance(self, node):
return Instance(node.id, node.public_ip[0], node.private_ip[0])
def _get_nodes_in_role(self, role, state_filter=None):
all_nodes = RackspaceCluster._list_nodes(self.driver)
nodes = []
for node in all_nodes:
try:
if node.extra['metadata']['cluster'] == self.name and \
role in node.extra['metadata']['roles'].split(','):
if state_filter == None or node.state == STATE_MAP[state_filter]:
nodes.append(node)
except KeyError:
pass
return nodes
def get_instances_in_role(self, role, state_filter=None):
"""
Get all the instances in a role, filtered by state.
@param role: the name of the role
@param state_filter: the state that the instance should be in
(e.g. "running"), or None for all states
"""
return [self._to_instance(node) for node in \
self._get_nodes_in_role(role, state_filter)]
def _print_node(self, node, out):
out.write("\t".join((node.extra['metadata']['roles'], node.id,
node.name,
self._ip_list_to_string(node.public_ip),
self._ip_list_to_string(node.private_ip),
STATE_MAP_REVERSED[node.state])))
out.write("\n")
def _ip_list_to_string(self, ips):
if ips is None:
return ""
return ",".join(ips)
def print_status(self, roles=None, state_filter="running", out=sys.stdout):
if not roles:
for node in self._get_nodes(state_filter):
self._print_node(node, out)
else:
for role in roles:
for node in self._get_nodes_in_role(role, state_filter):
self._print_node(node, out)
def launch_instances(self, roles, number, image_id, size_id,
instance_user_data, **kwargs):
metadata = {"cluster": self.name, "roles": ",".join(roles)}
node_ids = []
files = { USER_DATA_FILENAME: instance_user_data.read() }
if "public_key" in kwargs:
files["/root/.ssh/authorized_keys"] = open(kwargs["public_key"]).read()
for dummy in range(number):
node = self._launch_instance(roles, image_id, size_id, metadata, files)
node_ids.append(node.id)
return node_ids
def _launch_instance(self, roles, image_id, size_id, metadata, files):
instance_name = "%s-%s" % (self.name, uuid.uuid4().hex[-8:])
node = self.driver.create_node(instance_name, self._find_image(image_id),
self._find_size(size_id), metadata=metadata,
files=files)
return node
def _find_image(self, image_id):
return NodeImage(id=image_id, name=None, driver=None)
def _find_size(self, size_id):
matches = [i for i in self.driver.list_sizes() if i.id == str(size_id)]
if len(matches) != 1:
return None
return matches[0]
def wait_for_instances(self, instance_ids, timeout=600):
start_time = time.time()
while True:
if (time.time() - start_time >= timeout):
raise TimeoutException()
try:
if self._all_started(instance_ids):
break
except Exception:
pass
sys.stdout.write(".")
sys.stdout.flush()
time.sleep(1)
def _all_started(self, node_ids):
all_nodes = RackspaceCluster._list_nodes(self.driver)
node_id_to_node = {}
for node in all_nodes:
node_id_to_node[node.id] = node
for node_id in node_ids:
try:
if node_id_to_node[node_id].state != STATE_MAP["running"]:
return False
except KeyError:
return False
return True
def terminate(self):
nodes = self._get_nodes("running")
print nodes
for node in nodes:
self.driver.destroy_node(node)
class RackspaceHadoopService(HadoopService):
def _update_cluster_membership(self, public_key, private_key):
"""
Creates a cluster-wide hosts file and copies it across the cluster.
This is a stop gap until DNS is configured on the cluster.
"""
ssh_options = '-o StrictHostKeyChecking=no'
time.sleep(30) # wait for SSH daemon to start
nodes = self.cluster._get_nodes('running')
# create hosts file
hosts_file = 'hosts'
with open(hosts_file, 'w') as f:
f.write("127.0.0.1 localhost localhost.localdomain\n")
for node in nodes:
f.write(node.public_ip[0] + "\t" + node.name + "\n")
# copy to each node in the cluster
for node in nodes:
self._call('scp -i %s %s %s root@%s:/etc/hosts' \
% (private_key, ssh_options, hosts_file, node.public_ip[0]))
os.remove(hosts_file)
def _call(self, command):
print command
try:
subprocess.call(command, shell=True)
except Exception, e:
print e

View File

@ -49,7 +49,7 @@ class InstanceTemplate(object):
A template for creating server instances in a cluster.
"""
def __init__(self, roles, number, image_id, size_id,
key_name, public_key,
key_name, public_key, private_key,
user_data_file_template=None, placement=None,
user_packages=None, auto_shutdown=None, env_strings=[],
security_groups=[]):
@ -59,6 +59,7 @@ class InstanceTemplate(object):
self.size_id = size_id
self.key_name = key_name
self.public_key = public_key
self.private_key = private_key
self.user_data_file_template = user_data_file_template
self.placement = placement
self.user_packages = user_packages
@ -244,7 +245,7 @@ class HadoopService(Service):
Find and print clusters that have a running namenode instances
"""
legacy_clusters = get_cluster(provider).get_clusters_with_role(MASTER)
clusters = get_cluster(provider).get_clusters_with_role(NAMENODE)
clusters = list(get_cluster(provider).get_clusters_with_role(NAMENODE))
clusters.extend(legacy_clusters)
if not clusters:
print "No running clusters"
@ -284,6 +285,8 @@ class HadoopService(Service):
self._create_client_hadoop_site_file(config_dir)
self._authorize_client_ports(client_cidr)
self._attach_storage(roles)
self._update_cluster_membership(instance_templates[0].public_key,
instance_templates[0].private_key)
try:
self._wait_for_hadoop(number_of_tasktrackers)
except TimeoutException:
@ -412,8 +415,8 @@ echo Proxy pid %s;""" % (process.pid, process.pid)
namenode = self._get_namenode()
jobtracker = self._get_jobtracker()
cluster_dir = os.path.join(config_dir, self.cluster.name)
aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID']
aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY']
aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID') or ''
aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY') or ''
if not os.path.exists(cluster_dir):
os.makedirs(cluster_dir)
with open(os.path.join(cluster_dir, 'hadoop-site.xml'), 'w') as f:
@ -526,6 +529,9 @@ echo Proxy pid %s;""" % (process.pid, process.pid)
storage.attach(role, self.cluster.get_instances_in_role(role, 'running'))
storage.print_status(roles)
def _update_cluster_membership(self, public_key, private_key):
pass
class ZooKeeperService(Service):
"""
@ -610,7 +616,7 @@ clientPort=2181
SERVICE_PROVIDER_MAP = {
"hadoop": {
# "provider_code": ('hadoop.cloud.providers.provider_code', 'ProviderHadoopService')
"rackspace": ('hadoop.cloud.providers.rackspace', 'RackspaceHadoopService')
},
"zookeeper": {
# "provider_code": ('hadoop.cloud.providers.provider_code', 'ProviderZooKeeperService')

View File

@ -0,0 +1,74 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import StringIO
import unittest
from hadoop.cloud.providers.rackspace import RackspaceCluster
class TestCluster(unittest.TestCase):
class DriverStub(object):
def list_nodes(self):
class NodeStub(object):
def __init__(self, name, metadata):
self.id = name
self.name = name
self.state = 'ACTIVE'
self.public_ip = ['100.0.0.1']
self.private_ip = ['10.0.0.1']
self.extra = { 'metadata': metadata }
return [NodeStub('random_instance', {}),
NodeStub('cluster1-nj-000', {'cluster': 'cluster1', 'roles': 'nn,jt'}),
NodeStub('cluster1-dt-000', {'cluster': 'cluster1', 'roles': 'dn,tt'}),
NodeStub('cluster1-dt-001', {'cluster': 'cluster1', 'roles': 'dn,tt'}),
NodeStub('cluster2-dt-000', {'cluster': 'cluster2', 'roles': 'dn,tt'}),
NodeStub('cluster3-nj-000', {'cluster': 'cluster3', 'roles': 'nn,jt'})]
def test_get_clusters_with_role(self):
self.assertEqual(set(['cluster1', 'cluster2']),
RackspaceCluster.get_clusters_with_role('dn', 'running',
TestCluster.DriverStub()))
def test_get_instances_in_role(self):
cluster = RackspaceCluster('cluster1', None, TestCluster.DriverStub())
instances = cluster.get_instances_in_role('nn')
self.assertEquals(1, len(instances))
self.assertEquals('cluster1-nj-000', instances[0].id)
instances = cluster.get_instances_in_role('tt')
self.assertEquals(2, len(instances))
self.assertEquals(set(['cluster1-dt-000', 'cluster1-dt-001']),
set([i.id for i in instances]))
def test_print_status(self):
cluster = RackspaceCluster('cluster1', None, TestCluster.DriverStub())
out = StringIO.StringIO()
cluster.print_status(None, "running", out)
self.assertEquals("""nn,jt cluster1-nj-000 cluster1-nj-000 100.0.0.1 10.0.0.1 running
dn,tt cluster1-dt-000 cluster1-dt-000 100.0.0.1 10.0.0.1 running
dn,tt cluster1-dt-001 cluster1-dt-001 100.0.0.1 10.0.0.1 running
""", out.getvalue().replace("\t", " "))
out = StringIO.StringIO()
cluster.print_status(["dn"], "running", out)
self.assertEquals("""dn,tt cluster1-dt-000 cluster1-dt-000 100.0.0.1 10.0.0.1 running
dn,tt cluster1-dt-001 cluster1-dt-001 100.0.0.1 10.0.0.1 running
""", out.getvalue().replace("\t", " "))
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,46 @@
#!/bin/bash -x
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Given an Ubuntu base system install, install the base packages we need.
#
# We require multiverse to be enabled.
cat >> /etc/apt/sources.list << EOF
deb http://us.archive.ubuntu.com/ubuntu/ intrepid multiverse
deb-src http://us.archive.ubuntu.com/ubuntu/ intrepid multiverse
deb http://us.archive.ubuntu.com/ubuntu/ intrepid-updates multiverse
deb-src http://us.archive.ubuntu.com/ubuntu/ intrepid-updates multiverse
EOF
apt-get update
# Install Java
apt-get -y install sun-java6-jdk
echo "export JAVA_HOME=/usr/lib/jvm/java-6-sun" >> /etc/profile
export JAVA_HOME=/usr/lib/jvm/java-6-sun
java -version
# Install general packages
apt-get -y install vim curl screen ssh rsync unzip openssh-server
apt-get -y install policykit # http://www.bergek.com/2008/11/24/ubuntu-810-libpolkit-error/
# Create root's .ssh directory if it doesn't exist
mkdir -p /root/.ssh
# Run any rackspace init script injected at boot time
echo '[ -f /etc/init.d/rackspace-init.sh ] && /bin/sh /etc/init.d/rackspace-init.sh; exit 0' > /etc/rc.local