HADOOP-6772. Utilities for system tests specific. Contributed by Vinay Thota.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@948237 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Konstantin Boudnik 2010-05-25 22:54:40 +00:00
parent 1739f7388f
commit 5392319a24
2 changed files with 186 additions and 0 deletions

View File

@ -336,6 +336,8 @@ Release 0.21.0 - Unreleased
IMPROVEMENTS
HADOOP-6772. Utilities for system tests specific. (Vinay Thota via cos)
HADOOP-6771. Herriot's artifact id for Maven deployment should be set to
hadoop-core-instrumented (cos)

View File

@ -19,10 +19,14 @@
package org.apache.hadoop.test.system;
import java.io.IOException;
import java.io.File;
import java.io.FileOutputStream;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Enumeration;
import java.util.Hashtable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -41,6 +45,10 @@ public abstract class AbstractDaemonCluster {
protected ClusterProcessManager clusterManager;
private Map<Enum<?>, List<AbstractDaemonClient>> daemons =
new LinkedHashMap<Enum<?>, List<AbstractDaemonClient>>();
private String newConfDir = null;
private static final String CONF_HADOOP_LOCAL_DIR =
"test.system.hdrc.hadoop.local.confdir";
private final static Object waitLock = new Object();
/**
* Constructor to create a cluster client.<br/>
@ -288,5 +296,181 @@ public abstract class AbstractDaemonCluster {
}
}
}
/**
* It's a local folder where the config file stores temporarily
* while serializing the object.
* @return String temporary local folder path for configuration.
*/
private String getHadoopLocalConfDir() {
String hadoopLocalConfDir = conf.get(CONF_HADOOP_LOCAL_DIR);
if (hadoopLocalConfDir == null || hadoopLocalConfDir.isEmpty()) {
LOG.error("No configuration "
+ "for the CONF_HADOOP_LOCAL_DIR passed");
throw new IllegalArgumentException(
"No Configuration passed for hadoop conf local directory");
}
return hadoopLocalConfDir;
}
/**
* It uses to restart the cluster with new configuration at runtime.<br/>
* @param props attributes for new configuration.
* @param configFile configuration file.
* @throws IOException if an I/O error occurs.
*/
public void restartClusterWithNewConfig(Hashtable<String,Long> props,
String configFile) throws IOException {
String mapredConf = null;
String localDirPath = null;
File localFolderObj = null;
File xmlFileObj = null;
String confXMLFile = null;
Configuration initConf = new Configuration(getConf());
Enumeration<String> e = props.keys();
while (e.hasMoreElements()) {
String propKey = e.nextElement();
Long propValue = props.get(propKey);
initConf.setLong(propKey,propValue.longValue());
}
localDirPath = getHadoopLocalConfDir();
localFolderObj = new File(localDirPath);
if (!localFolderObj.exists()) {
localFolderObj.mkdir();
}
confXMLFile = localDirPath + File.separator + configFile;
xmlFileObj = new File(confXMLFile);
initConf.writeXml(new FileOutputStream(xmlFileObj));
newConfDir = clusterManager.pushConfig(localDirPath);
stop();
waitForClusterToStop();
clusterManager.start(newConfDir);
waitForClusterToStart();
localFolderObj.delete();
}
/**
* It uses to restart the cluster with default configuration.<br/>
* @throws IOException if an I/O error occurs.
*/
public void restart() throws
IOException {
stop();
waitForClusterToStop();
start();
waitForClusterToStart();
}
/**
* It uses to wait until the cluster is stopped.<br/>
* @throws IOException if an I/O error occurs.
*/
public void waitForClusterToStop() throws
IOException {
List<Thread> chkDaemonStop = new ArrayList<Thread>();
for (List<AbstractDaemonClient> set : daemons.values()) {
for (AbstractDaemonClient daemon : set) {
DaemonStopThread dmStop = new DaemonStopThread(daemon);
chkDaemonStop.add(dmStop);
dmStop.start();
}
}
for (Thread daemonThread : chkDaemonStop){
try {
daemonThread.join();
} catch(InterruptedException intExp) {
LOG.warn("Interrupted while thread is joining." + intExp.getMessage());
}
}
}
/**
* It uses to wait until the cluster is started.<br/>
* @throws IOException if an I/O error occurs.
*/
public void waitForClusterToStart() throws
IOException {
List<Thread> chkDaemonStart = new ArrayList<Thread>();
for (List<AbstractDaemonClient> set : daemons.values()) {
for (AbstractDaemonClient daemon : set) {
DaemonStartThread dmStart = new DaemonStartThread(daemon);
chkDaemonStart.add(dmStart);;
dmStart.start();
}
}
for (Thread daemonThread : chkDaemonStart){
try {
daemonThread.join();
} catch(InterruptedException intExp) {
LOG.warn("Interrupted while thread is joining" + intExp.getMessage());
}
}
}
/**
* It waits for specified amount of time.
* @param duration time in milliseconds.
* @throws InterruptedException if any thread interrupted the current
* thread while it is waiting for a notification.
*/
public void waitFor(long duration) {
try {
synchronized (waitLock) {
waitLock.wait(duration);
}
} catch (InterruptedException intExp) {
LOG.warn("Interrrupeted while thread is waiting" + intExp.getMessage());
}
}
class DaemonStartThread extends Thread {
private AbstractDaemonClient daemon;
public DaemonStartThread(AbstractDaemonClient daemon) {
this.daemon = daemon;
}
public void run(){
LOG.info("Waiting for Daemon " + daemon.getHostName()
+ " to come up.....");
while (true) {
try {
daemon.ping();
LOG.info("Daemon is : " + daemon.getHostName() + " pinging...");
break;
} catch (Exception exp) {
LOG.debug(daemon.getHostName() + " is waiting to come up.");
waitFor(60000);
}
}
}
}
class DaemonStopThread extends Thread {
private AbstractDaemonClient daemon;
public DaemonStopThread(AbstractDaemonClient daemon) {
this.daemon = daemon;
}
public void run() {
LOG.info("Waiting for Daemon " + daemon.getHostName()
+ " to stop.....");
while (true) {
try {
daemon.ping();
LOG.debug(daemon.getHostName() +" is waiting state to stop.");
waitFor(60000);
} catch (Exception exp) {
LOG.info("Daemon is : " + daemon.getHostName() + " stopped...");
break;
}
}
}
}
}