HBASE-24620 : Add a ClusterManager which submits command to ZooKeeper and its Agent which picks and execute those Commands (#2299)

Signed-off-by: Aman Poonia <apoonia@salesforce.com>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Lokesh Khurana 2020-12-21 15:33:36 +05:30 committed by GitHub
parent 904b555edc
commit f8bd22827a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1447 additions and 0 deletions

140
bin/chaos-daemon.sh Normal file
View File

@ -0,0 +1,140 @@
#!/usr/bin/env bash
#
#/**
# * Licensed to the Apache Software Foundation (ASF) under one
# * or more contributor license agreements. See the NOTICE file
# * distributed with this work for additional information
# * regarding copyright ownership. The ASF licenses this file
# * to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# * with the License. You may obtain a copy of the License at
# *
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing, software
# * distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
# */
#
usage="Usage: chaos-daemon.sh (start|stop) chaosagent"
# if no args specified, show usage
if [ $# -le 1 ]; then
echo "$usage"
exit 1
fi
# get arguments
startStop=$1
shift
command=$1
shift
check_before_start(){
#ckeck if the process is not running
mkdir -p "$HBASE_PID_DIR"
if [ -f "$CHAOS_PID" ]; then
if kill -0 "$(cat "$CHAOS_PID")" > /dev/null 2>&1; then
echo "$command" running as process "$(cat "$CHAOS_PID")". Stop it first.
exit 1
fi
fi
}
bin=`dirname "${BASH_SOURCE-$0}"`
bin=$(cd "$bin">/dev/null || exit; pwd)
. "$bin"/hbase-config.sh
. "$bin"/hbase-common.sh
CLASSPATH=$HBASE_CONF_DIR
for f in ../lib/*.jar; do
CLASSPATH=${CLASSPATH}:$f
done
# get log directory
if [ "$HBASE_LOG_DIR" = "" ]; then
export HBASE_LOG_DIR="$HBASE_HOME/logs"
fi
if [ "$HBASE_PID_DIR" = "" ]; then
HBASE_PID_DIR=/tmp
fi
if [ "$HBASE_IDENT_STRING" = "" ]; then
export HBASE_IDENT_STRING="$USER"
fi
if [ "$JAVA_HOME" != "" ]; then
#echo "run java in $JAVA_HOME"
JAVA_HOME=$JAVA_HOME
fi
if [ "$JAVA_HOME" = "" ]; then
echo "Error: JAVA_HOME is not set."
exit 1
fi
export HBASE_LOG_PREFIX=hbase-$HBASE_IDENT_STRING-$command-$HOSTNAME
export CHAOS_LOGFILE=$HBASE_LOG_PREFIX.log
if [ -z "${HBASE_ROOT_LOGGER}" ]; then
export HBASE_ROOT_LOGGER=${HBASE_ROOT_LOGGER:-"INFO,RFA"}
fi
if [ -z "${HBASE_SECURITY_LOGGER}" ]; then
export HBASE_SECURITY_LOGGER=${HBASE_SECURITY_LOGGER:-"INFO,RFAS"}
fi
CHAOS_LOGLOG=${CHAOS_LOGLOG:-"${HBASE_LOG_DIR}/${CHAOS_LOGFILE}"}
CHAOS_PID=$HBASE_PID_DIR/hbase-$HBASE_IDENT_STRING-$command.pid
if [ -z "$CHAOS_JAVA_OPTS" ]; then
CHAOS_JAVA_OPTS="-Xms1024m -Xmx4096m"
fi
case $startStop in
(start)
check_before_start
echo running $command
CMD="${JAVA_HOME}/bin/java -Dapp.home=${HBASE_CONF_DIR}/../ ${CHAOS_JAVA_OPTS} -cp ${CLASSPATH} org.apache.hadoop.hbase.chaos.ChaosService -$command start &>> ${CHAOS_LOGLOG} &"
eval $CMD
PID=$(echo $!)
echo ${PID} >${CHAOS_PID}
echo "Chaos ${1} process Started with ${PID} !"
now=$(date)
echo "${now} Chaos ${1} process Started with ${PID} !" >>${CHAOS_LOGLOG}
;;
(stop)
echo stopping $command
if [ -f $CHAOS_PID ]; then
pidToKill=`cat $CHAOS_PID`
# kill -0 == see if the PID exists
if kill -0 $pidToKill > /dev/null 2>&1; then
echo -n stopping $command
echo "`date` Terminating $command" >> $CHAOS_LOGLOG
kill $pidToKill > /dev/null 2>&1
waitForProcessEnd $pidToKill $command
else
retval=$?
echo no $command to stop because kill -0 of pid $pidToKill failed with status $retval
fi
else
echo no $command to stop because no pid file $CHAOS_PID
fi
rm -f $CHAOS_PID
;;
(*)
echo $usage
exit 1
;;
esac

View File

@ -0,0 +1,591 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.chaos;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.util.Shell;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/***
* An agent for executing destructive actions for ChaosMonkey.
* Uses ZooKeeper Watchers and LocalShell, to do the killing
* and getting status of service on targeted host without SSH.
* uses given ZNode Structure:
* /perfChaosTest (root)
* |
* |
* /chaosAgents (Used for registration has
* hostname ephemeral nodes as children)
* |
* |
* /chaosAgentTaskStatus (Used for task
* Execution, has hostname persistent
* nodes as child with tasks as their children)
* |
* |
* /hostname
* |
* |
* /task0000001 (command as data)
* (has two types of command :
* 1: starts with "exec"
* for executing a destructive action.
* 2: starts with "bool" for getting
* only status of service.
*
*/
@InterfaceAudience.Private
public class ChaosAgent implements Watcher, Closeable, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ChaosAgent.class);
static AtomicBoolean stopChaosAgent = new AtomicBoolean();
private ZooKeeper zk;
private String quorum;
private String agentName;
private Configuration conf;
private RetryCounterFactory retryCounterFactory;
private volatile boolean connected = false;
public ChaosAgent(Configuration conf, String quorum, String agentName) {
initChaosAgent(conf, quorum, agentName);
}
/***
* sets global params and initiates connection with ZooKeeper then does registration.
* @param conf initial configuration to use
* @param quorum ZK Quorum
* @param agentName AgentName to use
*/
private void initChaosAgent(Configuration conf, String quorum, String agentName) {
this.conf = conf;
this.quorum = quorum;
this.agentName = agentName;
this.retryCounterFactory = new RetryCounterFactory(new RetryCounter.RetryConfig()
.setMaxAttempts(conf.getInt(ChaosConstants.RETRY_ATTEMPTS_KEY,
ChaosConstants.DEFAULT_RETRY_ATTEMPTS)).setSleepInterval(
conf.getLong(ChaosConstants.RETRY_SLEEP_INTERVAL_KEY,
ChaosConstants.DEFAULT_RETRY_SLEEP_INTERVAL)));
try {
this.createZKConnection(null);
this.register();
} catch (IOException e) {
LOG.error("Error Creating Connection: " + e);
}
}
/***
* Creates Connection with ZooKeeper.
* @throws IOException if something goes wrong
*/
private void createZKConnection(Watcher watcher) throws IOException {
if(watcher == null) {
zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, this);
} else {
zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, watcher);
}
LOG.info("ZooKeeper Connection created for ChaosAgent: " + agentName);
}
//WATCHERS: Below are the Watches used by ChaosAgent
/***
* Watcher for notifying if any task is assigned to agent or not,
* by seeking if any Node is being added to agent as Child.
*/
Watcher newTaskCreatedWatcher = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
if (!(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE +
ChaosConstants.ZNODE_PATH_SEPARATOR + agentName).equals(watchedEvent.getPath())) {
throw new RuntimeException(KeeperException.create(
KeeperException.Code.DATAINCONSISTENCY));
}
LOG.info("Change in Tasks Node, checking for Tasks again.");
getTasks();
}
}
};
//CALLBACKS: Below are the Callbacks used by Chaos Agent
/**
* Callback used while setting status of a given task, Logs given status.
*/
AsyncCallback.StatCallback setStatusOfTaskZNodeCallback = (rc, path, ctx, stat) -> {
switch (KeeperException.Code.get(rc)) {
case CONNECTIONLOSS:
// Connection to the server was lost while setting status setting again.
try {
recreateZKConnection();
} catch (Exception e) {
break;
}
setStatusOfTaskZNode(path, (String) ctx);
break;
case OK:
LOG.info("Status of Task has been set");
break;
case NONODE:
LOG.error("Chaos Agent status node does not exists: "
+ "check for ZNode directory structure again.");
break;
default:
LOG.error("Error while setting status of task ZNode: " +
path, KeeperException.create(KeeperException.Code.get(rc), path));
}
};
/**
* Callback used while creating a Persistent ZNode tries to create
* ZNode again if Connection was lost in previous try.
*/
AsyncCallback.StringCallback createZNodeCallback = (rc, path, ctx, name) -> {
switch (KeeperException.Code.get(rc)) {
case CONNECTIONLOSS:
try {
recreateZKConnection();
} catch (Exception e) {
break;
}
createZNode(path, (byte[]) ctx);
break;
case OK:
LOG.info("ZNode created : " + path);
break;
case NODEEXISTS:
LOG.warn("ZNode already registered: " + path);
break;
default:
LOG.error("Error occurred while creating Persistent ZNode: " + path,
KeeperException.create(KeeperException.Code.get(rc), path));
}
};
/**
* Callback used while creating a Ephemeral ZNode tries to create ZNode again
* if Connection was lost in previous try.
*/
AsyncCallback.StringCallback createEphemeralZNodeCallback = (rc, path, ctx, name) -> {
switch (KeeperException.Code.get(rc)) {
case CONNECTIONLOSS:
try {
recreateZKConnection();
} catch (Exception e) {
break;
}
createEphemeralZNode(path, (byte[]) ctx);
break;
case OK:
LOG.info("ZNode created : " + path);
break;
case NODEEXISTS:
LOG.warn("ZNode already registered: " + path);
break;
default:
LOG.error("Error occurred while creating Ephemeral ZNode: ",
KeeperException.create(KeeperException.Code.get(rc), path));
}
};
/**
* Callback used by getTasksForAgentCallback while getting command,
* after getting command successfully, it executes command and
* set its status with respect to the command type.
*/
AsyncCallback.DataCallback getTaskForExecutionCallback = new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
switch (KeeperException.Code.get(rc)) {
case CONNECTIONLOSS:
//Connection to the server has been lost while getting task, getting data again.
try {
recreateZKConnection();
} catch (Exception e) {
break;
}
zk.getData(path,
false,
getTaskForExecutionCallback,
new String(data));
break;
case OK:
String cmd = new String(data);
LOG.info("Executing command : " + cmd);
String status = ChaosConstants.TASK_COMPLETION_STRING;
try {
String user = conf.get(ChaosConstants.CHAOSAGENT_SHELL_USER,
ChaosConstants.DEFAULT_SHELL_USER);
switch (cmd.substring(0, 4)) {
case "bool":
String ret = execWithRetries(user, cmd.substring(4)).getSecond();
status = Boolean.toString(ret.length() > 0);
break;
case "exec":
execWithRetries(user, cmd.substring(4));
break;
default:
LOG.error("Unknown Command Type");
status = ChaosConstants.TASK_ERROR_STRING;
}
} catch (IOException e) {
LOG.error("Got error while executing command : " + cmd +
" On agent : " + agentName + " Error : " + e);
status = ChaosConstants.TASK_ERROR_STRING;
}
try {
setStatusOfTaskZNode(path, status);
Thread.sleep(ChaosConstants.SET_STATUS_SLEEP_TIME);
} catch (InterruptedException e) {
LOG.error("Error occured after setting status: " + e);
}
default:
LOG.error("Error occurred while getting data",
KeeperException.create(KeeperException.Code.get(rc), path));
}
}
};
/***
* Callback used while getting Tasks for agent if call executed without Exception,
* It creates a separate thread for each children to execute given Tasks parallely.
*/
AsyncCallback.ChildrenCallback getTasksForAgentCallback = new AsyncCallback.ChildrenCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
switch (KeeperException.Code.get(rc)) {
case CONNECTIONLOSS: {
// Connection to the server has been lost, getting tasks again.
try {
recreateZKConnection();
} catch (Exception e) {
break;
}
getTasks();
break;
}
case OK: {
if (children != null) {
try {
LOG.info("Executing each task as a separate thread");
List<Thread> tasksList = new ArrayList<>();
for (String task : children) {
String threadName = agentName + "_" + task;
Thread t = new Thread(() -> {
LOG.info("Executing task : " + task + " of agent : " + agentName);
zk.getData(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE +
ChaosConstants.ZNODE_PATH_SEPARATOR + agentName +
ChaosConstants.ZNODE_PATH_SEPARATOR + task,
false,
getTaskForExecutionCallback,
task);
});
t.setName(threadName);
t.start();
tasksList.add(t);
for (Thread thread : tasksList) {
thread.join();
}
}
} catch (InterruptedException e) {
LOG.error("Error scheduling next task : " +
" for agent : " + agentName + " Error : " + e);
}
}
break;
}
default:
LOG.error("Error occurred while getting task",
KeeperException.create(KeeperException.Code.get(rc), path));
}
}
};
/***
* Function to create PERSISTENT ZNODE with given path and data given as params
* @param path Path at which ZNode to create
* @param data Data to put under ZNode
*/
public void createZNode(String path, byte[] data) {
zk.create(path,
data,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
createZNodeCallback,
data);
}
/***
* Function to create EPHEMERAL ZNODE with given path and data as params.
* @param path Path at which Ephemeral ZNode to create
* @param data Data to put under ZNode
*/
public void createEphemeralZNode(String path, byte[] data) {
zk.create(path,
data,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL,
createEphemeralZNodeCallback,
data);
}
/**
* Checks if given ZNode exists, if not creates a PERSISTENT ZNODE for same.
*
* @param path Path to check for ZNode
*/
private void createIfZNodeNotExists(String path) {
try {
if (zk.exists(path,
false) == null) {
createZNode(path, new byte[0]);
}
} catch (KeeperException | InterruptedException e) {
LOG.error("Error checking given node : " + path + " " + e);
}
}
/**
* sets given Status for Task Znode
*
* @param taskZNode ZNode to set status
* @param status Status value
*/
public void setStatusOfTaskZNode(String taskZNode, String status) {
LOG.info("Setting status of Task ZNode: " + taskZNode + " status : " + status);
zk.setData(taskZNode,
status.getBytes(),
-1,
setStatusOfTaskZNodeCallback,
null);
}
/**
* registration of ChaosAgent by checking and creating necessary ZNodes.
*/
private void register() {
createIfZNodeNotExists(ChaosConstants.CHAOS_TEST_ROOT_ZNODE);
createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE);
createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE);
createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE +
ChaosConstants.ZNODE_PATH_SEPARATOR + agentName);
createEphemeralZNode(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE +
ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, new byte[0]);
}
/***
* Gets tasks for execution, basically sets Watch on it's respective host's Znode and
* waits for tasks to be assigned, also has a getTasksForAgentCallback
* which handles execution of task.
*/
private void getTasks() {
LOG.info("Getting Tasks for Agent: " + agentName + "and setting watch for new Tasks");
zk.getChildren(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE +
ChaosConstants.ZNODE_PATH_SEPARATOR + agentName,
newTaskCreatedWatcher,
getTasksForAgentCallback,
null);
}
/**
* Below function executes command with retries with given user.
* Uses LocalShell to execute a command.
*
* @param user user name, default none
* @param cmd Command to execute
* @return A pair of Exit Code and Shell output
* @throws IOException Exception while executing shell command
*/
private Pair<Integer, String> execWithRetries(String user, String cmd) throws IOException {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
return exec(user, cmd);
} catch (IOException e) {
retryOrThrow(retryCounter, e, user, cmd);
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException e) {
LOG.warn("Sleep Interrupted: " + e);
}
}
}
private Pair<Integer, String> exec(String user, String cmd) throws IOException {
LOG.info("Executing Shell command: " + cmd + " , user: " + user);
LocalShell shell = new LocalShell(user, cmd);
try {
shell.execute();
} catch (Shell.ExitCodeException e) {
String output = shell.getOutput();
throw new Shell.ExitCodeException(e.getExitCode(), "stderr: " + e.getMessage()
+ ", stdout: " + output);
}
LOG.info("Executed Shell command, exit code: {}, output n{}", shell.getExitCode(), shell.getOutput());
return new Pair<>(shell.getExitCode(), shell.getOutput());
}
private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex,
String user, String cmd) throws E {
if (retryCounter.shouldRetry()) {
LOG.warn("Local command: {}, user: {}, failed at attempt {}. Retrying until maxAttempts: {}."
+ "Exception {}", cmd, user,retryCounter.getAttemptTimes(), retryCounter.getMaxAttempts(),
ex.getMessage());
return;
}
throw ex;
}
private boolean isConnected() {
return connected;
}
@Override
public void close() throws IOException {
LOG.info("Closing ZooKeeper Connection for Chaos Agent : " + agentName);
try {
zk.close();
} catch (InterruptedException e) {
LOG.error("Error while closing ZooKeeper Connection.");
}
}
@Override
public void run() {
try {
LOG.info("Running Chaos Agent on : " + agentName);
while (!this.isConnected()) {
Thread.sleep(100);
}
this.getTasks();
while (!stopChaosAgent.get()) {
Thread.sleep(500);
}
} catch (InterruptedException e) {
LOG.error("Error while running Chaos Agent", e);
}
}
@Override
public void process(WatchedEvent watchedEvent) {
LOG.info("Processing event: " + watchedEvent.toString());
if (watchedEvent.getType() == Event.EventType.None) {
switch (watchedEvent.getState()) {
case SyncConnected:
connected = true;
break;
case Disconnected:
connected = false;
break;
case Expired:
connected = false;
LOG.error("Session expired creating again");
try {
createZKConnection(null);
} catch (IOException e) {
LOG.error("Error creating Zookeeper connection", e);
}
default:
LOG.error("Unknown State");
break;
}
}
}
private void recreateZKConnection() throws Exception{
try {
zk.close();
createZKConnection(newTaskCreatedWatcher);
createEphemeralZNode(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE +
ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, new byte[0]);
} catch (IOException e) {
LOG.error("Error creating new ZK COnnection for agent: {}", agentName + e);
throw e;
}
}
/**
* Executes Command locally.
*/
protected static class LocalShell extends Shell.ShellCommandExecutor {
private String user;
private String execCommand;
public LocalShell(String user, String execCommand) {
super(new String[]{execCommand});
this.user = user;
this.execCommand = execCommand;
}
@Override
public String[] getExecString() {
// TODO: Considering Agent is running with same user.
if(!user.equals(ChaosConstants.DEFAULT_SHELL_USER)){
execCommand = String.format("su -u %1$s %2$s", user, execCommand);
}
return new String[]{"/usr/bin/env", "bash", "-c", execCommand};
}
@Override
public void execute() throws IOException {
super.execute();
}
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.chaos;
import org.apache.yetus.audience.InterfaceAudience;
/***
* ChaosConstant holds a bunch of Choas-related Constants
*/
@InterfaceAudience.Public
public final class ChaosConstants {
/*Base ZNode for whole Chaos Testing*/
public static final String CHAOS_TEST_ROOT_ZNODE = "/hbase";
/*Just a / used for path separator*/
public static final String ZNODE_PATH_SEPARATOR = "/";
/*ZNode used for ChaosAgents registration.*/
public static final String CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE =
CHAOS_TEST_ROOT_ZNODE + ZNODE_PATH_SEPARATOR + "chaosAgents";
/*ZNode used for getting status of tasks assigned*/
public static final String CHAOS_AGENT_STATUS_PERSISTENT_ZNODE =
CHAOS_TEST_ROOT_ZNODE + ZNODE_PATH_SEPARATOR + "chaosAgentTaskStatus";
/*Config property for getting number of retries to execute a command*/
public static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
/*Default value for number of retries*/
public static final int DEFAULT_RETRY_ATTEMPTS = 5;
/*Config property to sleep in between retries*/
public static final String RETRY_SLEEP_INTERVAL_KEY =
"hbase.it.clustermanager.retry.sleep.interval";
/*Default Sleep time between each retry*/
public static final int DEFAULT_RETRY_SLEEP_INTERVAL = 5000;
/*Config property for executing command as specific user*/
public static final String CHAOSAGENT_SHELL_USER = "hbase.it.clustermanager.ssh.user";
/*default user for executing local commands*/
public static final String DEFAULT_SHELL_USER = "";
/*timeout used while creating ZooKeeper connection*/
public static final int SESSION_TIMEOUT_ZK = 60000 * 10;
/*Time given to ChaosAgent to set status*/
public static final int SET_STATUS_SLEEP_TIME = 30 * 1000;
/*Status String when you get an ERROR while executing task*/
public static final String TASK_ERROR_STRING = "error";
/*Status String when your command gets executed correctly*/
public static final String TASK_COMPLETION_STRING = "done";
/*Name of ChoreService to use*/
public static final String CHORE_SERVICE_PREFIX = "ChaosService";
}

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.chaos;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
/**
* Class used to start/stop Chaos related services (currently chaosagent)
*/
@InterfaceAudience.Private
public class ChaosService {
private static final Logger LOG = LoggerFactory.getLogger(ChaosService.class.getName());
public static void execute(String[] args, Configuration conf) {
LOG.info("arguments : " + Arrays.toString(args));
try {
CommandLine cmdline = new GnuParser().parse(getOptions(), args);
if (cmdline.hasOption(ChaosServiceName.CHAOSAGENT.toString().toLowerCase())) {
String actionStr = cmdline.getOptionValue(ChaosServiceName.CHAOSAGENT.toString().toLowerCase());
try {
ExecutorAction action = ExecutorAction.valueOf(actionStr.toUpperCase());
if (action == ExecutorAction.START) {
ChaosServiceStart(conf, ChaosServiceName.CHAOSAGENT);
} else if (action == ExecutorAction.STOP) {
ChaosServiceStop();
}
} catch (IllegalArgumentException e) {
LOG.error("action passed: {} Unexpected action. Please provide only start/stop.",
actionStr, e);
throw new RuntimeException(e);
}
} else {
LOG.error("Invalid Options");
}
} catch (Exception e) {
LOG.error("Error while starting ChaosService : ", e);
}
}
private static void ChaosServiceStart(Configuration conf, ChaosServiceName serviceName) {
switch (serviceName) {
case CHAOSAGENT:
ChaosAgent.stopChaosAgent.set(false);
try {
Thread t = new Thread(new ChaosAgent(conf,
ChaosUtils.getZKQuorum(conf), ChaosUtils.getHostName()));
t.start();
t.join();
} catch (InterruptedException | UnknownHostException e) {
LOG.error("Failed while executing next task execution of ChaosAgent on : {}",
serviceName, e);
}
break;
default:
LOG.error("Service Name not known : " + serviceName.toString());
}
}
private static void ChaosServiceStop() {
ChaosAgent.stopChaosAgent.set(true);
}
private static Options getOptions() {
Options options = new Options();
options.addOption(new Option("c", ChaosServiceName.CHAOSAGENT.toString().toLowerCase(),
true, "expecting a start/stop argument"));
options.addOption(new Option("D", ChaosServiceName.GENERIC.toString(),
true, "generic D param"));
LOG.info(Arrays.toString(new Collection[] { options.getOptions() }));
return options;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
new GenericOptionsParser(conf, args);
ChoreService choreChaosService = null;
ScheduledChore authChore = AuthUtil.getAuthChore(conf);
try {
if (authChore != null) {
choreChaosService = new ChoreService(ChaosConstants.CHORE_SERVICE_PREFIX);
choreChaosService.scheduleChore(authChore);
}
execute(args, conf);
} finally {
if (authChore != null)
choreChaosService.shutdown();
}
}
enum ChaosServiceName {
CHAOSAGENT,
GENERIC
}
enum ExecutorAction {
START,
STOP
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.chaos;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
/**
* ChaosUtils holds a bunch of useful functions like getting hostname and getting ZooKeeper quorum.
*/
@InterfaceAudience.Private
public class ChaosUtils {
public static String getHostName() throws UnknownHostException {
return InetAddress.getLocalHost().getHostName();
}
public static String getZKQuorum(Configuration conf) {
String port =
Integer.toString(conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181));
String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM, "localhost");
for (int i = 0; i < serverHosts.length; i++) {
serverHosts[i] = serverHosts[i] + ":" + port;
}
return String.join(",", serverHosts);
}
}

View File

@ -0,0 +1,332 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class ChaosZKClient {
private static final Logger LOG = LoggerFactory.getLogger(ChaosZKClient.class.getName());
private static final String CHAOS_AGENT_PARENT_ZNODE = "/hbase/chaosAgents";
private static final String CHAOS_AGENT_STATUS_ZNODE = "/hbase/chaosAgentTaskStatus";
private static final String ZNODE_PATH_SEPARATOR = "/";
private static final String TASK_PREFIX = "task_";
private static final String TASK_ERROR_STRING = "error";
private static final String TASK_COMPLETION_STRING = "done";
private static final String TASK_BOOLEAN_TRUE = "true";
private static final String TASK_BOOLEAN_FALSE = "false";
private static final String CONNECTION_LOSS = "ConnectionLoss";
private static final int SESSION_TIMEOUT_ZK = 10 * 60 * 1000;
private static final int TASK_EXECUTION_TIMEOUT = 5 * 60 * 1000;
private volatile String taskStatus = null;
private final String quorum;
private ZooKeeper zk;
public ChaosZKClient(String quorum) {
this.quorum = quorum;
try {
this.createNewZKConnection();
} catch (IOException e) {
LOG.error("Error creating ZooKeeper Connection: ", e);
}
}
/**
* Creates connection with ZooKeeper
* @throws IOException when not able to create connection properly
*/
public void createNewZKConnection() throws IOException {
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
LOG.info("Created ZooKeeper Connection For executing task");
}
};
this.zk = new ZooKeeper(quorum, SESSION_TIMEOUT_ZK, watcher);
}
/**
* Checks if ChaosAgent is running or not on target host by checking its ZNode.
* @param hostname hostname to check for chaosagent
* @return true/false whether agent is running or not
*/
private boolean isChaosAgentRunning(String hostname) {
try {
return zk.exists(CHAOS_AGENT_PARENT_ZNODE + ZNODE_PATH_SEPARATOR + hostname,
false) != null;
} catch (KeeperException e) {
if (e.toString().contains(CONNECTION_LOSS)) {
recreateZKConnection();
try {
return zk.exists(CHAOS_AGENT_PARENT_ZNODE + ZNODE_PATH_SEPARATOR + hostname,
false) != null;
} catch (KeeperException | InterruptedException ie) {
LOG.error("ERROR ", ie);
}
}
} catch (InterruptedException e) {
LOG.error("Error checking for given hostname: {} ERROR: ", hostname, e);
}
return false;
}
/**
* Creates tasks for target hosts by creating ZNodes.
* Waits for a limited amount of time to complete task to execute.
* @param taskObject Object data represents command
* @return returns status
*/
public String submitTask(final TaskObject taskObject) {
if (isChaosAgentRunning(taskObject.getTaskHostname())) {
LOG.info("Creating task node");
zk.create(CHAOS_AGENT_STATUS_ZNODE + ZNODE_PATH_SEPARATOR +
taskObject.getTaskHostname() + ZNODE_PATH_SEPARATOR + TASK_PREFIX,
taskObject.getCommand().getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL,
submitTaskCallback,
taskObject);
long start = System.currentTimeMillis();
while ((System.currentTimeMillis() - start) < TASK_EXECUTION_TIMEOUT) {
if(taskStatus != null) {
return taskStatus;
}
Threads.sleep(500);
}
} else {
LOG.info("EHHHHH! ChaosAgent Not running");
}
return TASK_ERROR_STRING;
}
/**
* To get status of task submitted
* @param path path at which to get status
* @param ctx path context
*/
private void getStatus(String path , Object ctx) {
LOG.info("Getting Status of task: " + path);
zk.getData(path,
false,
getStatusCallback,
ctx);
}
/**
* Set a watch on task submitted
* @param name ZNode name to set a watch
* @param taskObject context for ZNode name
*/
private void setStatusWatch(String name, TaskObject taskObject) {
LOG.info("Checking for ZNode and Setting watch for task : " + name);
zk.exists(name,
setStatusWatcher,
setStatusWatchCallback,
taskObject);
}
/**
* Delete task after getting its status
* @param path path to delete ZNode
*/
private void deleteTask(String path) {
LOG.info("Deleting task: " + path);
zk.delete(path,
-1,
taskDeleteCallback,
null);
}
//WATCHERS:
/**
* Watcher to get notification whenever status of task changes.
*/
Watcher setStatusWatcher = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
LOG.info("Setting status watch for task: " + watchedEvent.getPath());
if(watchedEvent.getType() == Event.EventType.NodeDataChanged) {
if(!watchedEvent.getPath().contains(TASK_PREFIX)) {
throw new RuntimeException(KeeperException.create(
KeeperException.Code.DATAINCONSISTENCY));
}
getStatus(watchedEvent.getPath(), (Object) watchedEvent.getPath());
}
}
};
//CALLBACKS
AsyncCallback.DataCallback getStatusCallback = (rc, path, ctx, data, stat) -> {
switch (KeeperException.Code.get(rc)) {
case CONNECTIONLOSS:
//Connectionloss while getting status of task, getting again
recreateZKConnection();
getStatus(path, ctx);
break;
case OK:
if (ctx!=null) {
String status = new String(data);
taskStatus = status;
switch (status) {
case TASK_COMPLETION_STRING:
case TASK_BOOLEAN_TRUE:
case TASK_BOOLEAN_FALSE:
LOG.info("Task executed completely : Status --> " + status);
break;
case TASK_ERROR_STRING:
LOG.info("There was error while executing task : Status --> " + status);
break;
default:
LOG.warn("Status of task is undefined!! : Status --> " + status);
}
deleteTask(path);
}
break;
default:
LOG.error("ERROR while getting status of task: " + path + " ERROR: " +
KeeperException.create(KeeperException.Code.get(rc)));
}
};
AsyncCallback.StatCallback setStatusWatchCallback = (rc, path, ctx, stat) -> {
switch (KeeperException.Code.get(rc)) {
case CONNECTIONLOSS:
//ConnectionLoss while setting watch on status ZNode, setting again.
recreateZKConnection();
setStatusWatch(path, (TaskObject) ctx);
break;
case OK:
if(stat != null) {
getStatus(path, null);
}
break;
default:
LOG.error("ERROR while setting watch on task ZNode: " + path + " ERROR: " +
KeeperException.create(KeeperException.Code.get(rc)));
}
};
AsyncCallback.StringCallback submitTaskCallback = (rc, path, ctx, name) -> {
switch (KeeperException.Code.get(rc)) {
case CONNECTIONLOSS:
// Connection to server was lost while submitting task, submitting again.
recreateZKConnection();
submitTask((TaskObject) ctx);
break;
case OK:
LOG.info("Task created : " + name);
setStatusWatch(name, (TaskObject) ctx);
break;
default:
LOG.error("Error submitting task: " + name + " ERROR:" +
KeeperException.create(KeeperException.Code.get(rc)));
}
};
AsyncCallback.VoidCallback taskDeleteCallback = new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
switch (KeeperException.Code.get(rc)) {
case CONNECTIONLOSS:
//Connectionloss while deleting task, deleting again
recreateZKConnection();
deleteTask(path);
break;
case OK:
LOG.info("Task Deleted successfully!");
LOG.info("Closing ZooKeeper Connection");
try {
zk.close();
} catch (InterruptedException e) {
LOG.error("Error while closing ZooKeeper Connection.");
}
break;
default:
LOG.error("ERROR while deleting task: " + path + " ERROR: " +
KeeperException.create(KeeperException.Code.get(rc)));
}
}
};
private void recreateZKConnection() {
try {
zk.close();
} catch (InterruptedException e) {
LOG.error("Error closing ZK connection : ", e);
} finally {
try {
createNewZKConnection();
} catch (IOException e) {
LOG.error("Error creating new ZK COnnection for agent: ", e);
}
}
}
static class TaskObject {
private final String command;
private final String taskHostname;
public TaskObject(String command, String taskHostname) {
this.command = command;
this.taskHostname = taskHostname;
}
public String getCommand() {
return this.command;
}
public String getTaskHostname() {
return taskHostname;
}
}
}

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.Arrays;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configured;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class ZNodeClusterManager extends Configured implements ClusterManager {
private static final Logger LOG = LoggerFactory.getLogger(ZNodeClusterManager.class.getName());
private static final String SIGKILL = "SIGKILL";
private static final String SIGSTOP = "SIGSTOP";
private static final String SIGCONT = "SIGCONT";
public ZNodeClusterManager() {
}
private String getZKQuorumServersStringFromHbaseConfig() {
String port =
Integer.toString(getConf().getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181));
String[] serverHosts = getConf().getStrings(HConstants.ZOOKEEPER_QUORUM, "localhost");
for (int i = 0; i < serverHosts.length; i++) {
serverHosts[i] = serverHosts[i] + ":" + port;
}
return Arrays.asList(serverHosts).stream().collect(Collectors.joining(","));
}
private String createZNode(String hostname, String cmd) throws IOException{
LOG.info("Zookeeper Mode enabled sending command to zookeeper + " +
cmd + "hostname:" + hostname);
ChaosZKClient chaosZKClient = new ChaosZKClient(getZKQuorumServersStringFromHbaseConfig());
return chaosZKClient.submitTask(new ChaosZKClient.TaskObject(cmd, hostname));
}
protected HBaseClusterManager.CommandProvider getCommandProvider(ServiceType service)
throws IOException {
switch (service) {
case HADOOP_DATANODE:
case HADOOP_NAMENODE:
return new HBaseClusterManager.HadoopShellCommandProvider(getConf());
case ZOOKEEPER_SERVER:
return new HBaseClusterManager.ZookeeperShellCommandProvider(getConf());
default:
return new HBaseClusterManager.HBaseShellCommandProvider(getConf());
}
}
public void signal(ServiceType service, String signal, String hostname) throws IOException {
createZNode(hostname, CmdType.exec.toString() +
getCommandProvider(service).signalCommand(service, signal));
}
private void createOpCommand(String hostname, ServiceType service,
HBaseClusterManager.CommandProvider.Operation op) throws IOException{
createZNode(hostname, CmdType.exec.toString() +
getCommandProvider(service).getCommand(service, op));
}
@Override
public void start(ServiceType service, String hostname, int port) throws IOException {
createOpCommand(hostname, service, HBaseClusterManager.CommandProvider.Operation.START);
}
@Override
public void stop(ServiceType service, String hostname, int port) throws IOException {
createOpCommand(hostname, service, HBaseClusterManager.CommandProvider.Operation.STOP);
}
@Override
public void restart(ServiceType service, String hostname, int port) throws IOException {
createOpCommand(hostname, service, HBaseClusterManager.CommandProvider.Operation.RESTART);
}
@Override
public void kill(ServiceType service, String hostname, int port) throws IOException {
signal(service, SIGKILL, hostname);
}
@Override
public void suspend(ServiceType service, String hostname, int port) throws IOException {
signal(service, SIGSTOP, hostname);
}
@Override
public void resume(ServiceType service, String hostname, int port) throws IOException {
signal(service, SIGCONT, hostname);
}
@Override
public boolean isRunning(ServiceType service, String hostname, int port) throws IOException {
return Boolean.parseBoolean(createZNode(hostname, CmdType.bool.toString() +
getCommandProvider(service).isRunningCommand(service)));
}
enum CmdType {
exec,
bool
}
}