diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 16f801147a9..71283a56f93 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -109,6 +109,9 @@ Release 2.7.0 - UNRELEASED HADOOP-11464. Reinstate support for launching Hadoop processes on Windows using Cygwin. (cnauroth) + HADOOP-9992. Modify the NN loadGenerator to optionally run as a MapReduce job + (Akshay Radia via brandonli) + OPTIMIZATIONS HADOOP-11323. WritableComparator#compare keeps reference to byte array. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGenerator.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGenerator.java index 994b9b2506d..ca017022177 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGenerator.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGenerator.java @@ -19,10 +19,12 @@ package org.apache.hadoop.fs.loadGenerator; import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; +import java.io.DataInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintStream; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -36,10 +38,11 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Options.CreateOpts; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; @@ -48,8 +51,11 @@ import com.google.common.base.Preconditions; /** The load generator is a tool for testing NameNode behavior under - * different client loads. - * It allows the user to generate different mixes of read, write, + * different client loads. Note there is a subclass of this clas that lets + * you run a the load generator as a MapReduce job (see LoadGeneratorMR in the + * MapReduce project. + * + * The loadGenerator allows the user to generate different mixes of read, write, * and list requests by specifying the probabilities of read and * write. The user controls the intensity of the load by * adjusting parameters for the number of worker threads and the delay @@ -58,15 +64,24 @@ * generator exits, it print some NameNode statistics like the average * execution time of each kind of operations and the NameNode * throughput. + * + * The program can run in one of two forms. As a regular single process command + * that runs multiple threads to generate load on the NN or as a Map Reduce + * program that runs multiple (multi-threaded) map tasks that generate load + * on the NN; the results summary is generated by a single reduce task. + * * * The user may either specify constant duration, read and write * probabilities via the command line, or may specify a text file * that acts as a script of which read and write probabilities to - * use for specified durations. + * use for specified durations. If no duration is specified the program + * runs till killed (duration required if run as MapReduce). * * The script takes the form of lines of duration in seconds, read * probability and write probability, each separated by white space. - * Blank lines and lines starting with # (comments) are ignored. + * Blank lines and lines starting with # (comments) are ignored. If load + * generator is run as a MapReduce program then the script file needs to be + * accessible on the the Map task as a HDFS file. * * After command line argument parsing and data initialization, * the load generator spawns the number of worker threads @@ -116,31 +131,43 @@ public class LoadGenerator extends Configured implements Tool { public static final Log LOG = LogFactory.getLog(LoadGenerator.class); - private volatile boolean shouldRun = true; - private Path root = DataGenerator.DEFAULT_ROOT; - private FileContext fc; - private int maxDelayBetweenOps = 0; - private int numOfThreads = 200; - private long [] durations = {0}; - private double [] readProbs = {0.3333}; - private double [] writeProbs = {0.3333}; - private volatile int currentIndex = 0; - long totalTime = 0; - private long startTime = Time.now()+10000; + private volatile static boolean shouldRun = true; + protected static Path root = DataGenerator.DEFAULT_ROOT; + private static FileContext fc; + protected static int maxDelayBetweenOps = 0; + protected static int numOfThreads = 200; + protected static long [] durations = {0}; + protected static double [] readProbs = {0.3333}; + protected static double [] writeProbs = {0.3333}; + private static volatile int currentIndex = 0; + protected static long totalTime = 0; + protected static long startTime = Time.now()+10000; final static private int BLOCK_SIZE = 10; - private ArrayList files = new ArrayList(); // a table of file names - private ArrayList dirs = new ArrayList(); // a table of directory names - private Random r = null; - final private static String USAGE = "java LoadGenerator\n" + - "-readProbability \n" + - "-writeProbability \n" + - "-root \n" + - "-maxDelayBetweenOps \n" + - "-numOfThreads \n" + - "-elapsedTime \n" + - "-startTime \n" + - "-scriptFile "; - final private String hostname; + private static ArrayList files = new ArrayList(); // a table of file names + private static ArrayList dirs = new ArrayList(); // a table of directory names + protected static Random r = null; + protected static long seed = 0; + protected static String scriptFile = null; + protected static final String FLAGFILE_DEFAULT = "/tmp/flagFile"; + protected static Path flagFile = new Path(FLAGFILE_DEFAULT); + protected String hostname; + final private static String USAGE_CMD = "java LoadGenerator\n"; + final protected static String USAGE_ARGS = + "-readProbability \n" + + "-writeProbability \n" + + "-root \n" + + "-maxDelayBetweenOps \n" + + "-numOfThreads \n" + + "-elapsedTime \n" + + "-startTime \n" + + "-scriptFile \n" + + "-flagFile "; + final private static String USAGE = USAGE_CMD + USAGE_ARGS; + + + + + private final byte[] WRITE_CONTENTS = new byte[4096]; private static final int ERR_TEST_FAILED = 2; @@ -151,15 +178,21 @@ public LoadGenerator() throws IOException, UnknownHostException { hostname = addr.getHostName(); Arrays.fill(WRITE_CONTENTS, (byte) 'a'); } + + public LoadGenerator(Configuration conf) throws IOException, UnknownHostException { + this(); + setConf(conf); + } - private final static int OPEN = 0; - private final static int LIST = 1; - private final static int CREATE = 2; - private final static int WRITE_CLOSE = 3; - private final static int DELETE = 4; - private final static int TOTAL_OP_TYPES =5; - private long [] executionTime = new long[TOTAL_OP_TYPES]; - private long [] totalNumOfOps = new long[TOTAL_OP_TYPES]; + protected final static int OPEN = 0; + protected final static int LIST = 1; + protected final static int CREATE = 2; + protected final static int WRITE_CLOSE = 3; + protected final static int DELETE = 4; + protected final static int TOTAL_OP_TYPES =5; + protected static long [] executionTime = new long[TOTAL_OP_TYPES]; + protected static long [] numOfOps = new long[TOTAL_OP_TYPES]; + protected static long totalOps = 0; // across all of types /** A thread sends a stream of requests to the NameNode. * At each iteration, it first decides if it is going to read a file, @@ -192,7 +225,7 @@ private DFSClientThread(int id) { this.id = id; } - /** Main loop + /** Main loop for each thread * Each iteration decides what's the next operation and then pauses. */ @Override @@ -295,7 +328,7 @@ private void genFile(Path file, long fileSize) throws IOException { CreateOpts.createParent(), CreateOpts.bufferSize(4096), CreateOpts.repFac((short) 3)); executionTime[CREATE] += (Time.now() - startTime); - totalNumOfOps[CREATE]++; + numOfOps[CREATE]++; long i = fileSize; while (i > 0) { @@ -306,28 +339,67 @@ private void genFile(Path file, long fileSize) throws IOException { startTime = Time.now(); executionTime[WRITE_CLOSE] += (Time.now() - startTime); - totalNumOfOps[WRITE_CLOSE]++; + numOfOps[WRITE_CLOSE]++; } finally { IOUtils.cleanup(LOG, out); } } } - /** Main function: + /** Main function called by tool runner. * It first initializes data by parsing the command line arguments. - * It then starts the number of DFSClient threads as specified by - * the user. - * It stops all the threads when the specified elapsed time is passed. - * Before exiting, it prints the average execution for - * each operation and operation throughput. + * It then calls the loadGenerator */ @Override public int run(String[] args) throws Exception { - int exitCode = init(args); + int exitCode = parseArgs(false, args); if (exitCode != 0) { return exitCode; } + System.out.println("Running LoadGenerator against fileSystem: " + + FileContext.getFileContext().getDefaultFileSystem().getUri()); + exitCode = generateLoadOnNN(); + printResults(System.out); + return exitCode; + } + boolean stopFileCreated() { + try { + fc.getFileStatus(flagFile); + } catch (FileNotFoundException e) { + return false; + } catch (IOException e) { + LOG.error("Got error when checking if file exists:" + flagFile, e); + } + LOG.info("Flag file was created. Stopping the test."); + return true; + } + + /** + * This is the main function - run threads to generate load on NN + * It starts the number of DFSClient threads as specified by + * the user. + * It stops all the threads when the specified elapsed time is passed. + */ + protected int generateLoadOnNN() throws InterruptedException { + int hostHashCode = hostname.hashCode(); + if (seed == 0) { + r = new Random(System.currentTimeMillis()+hostHashCode); + } else { + r = new Random(seed+hostHashCode); + } + try { + fc = FileContext.getFileContext(getConf()); + } catch (IOException ioe) { + System.err.println("Can not initialize the file system: " + + ioe.getLocalizedMessage()); + return -1; + } + + int status = initFileDirTables(); + if (status != 0) { + return status; + } barrier(); DFSClientThread[] threads = new DFSClientThread[numOfThreads]; @@ -337,91 +409,99 @@ public int run(String[] args) throws Exception { } if (durations[0] > 0) { - while(shouldRun) { - Thread.sleep(durations[currentIndex] * 1000); - totalTime += durations[currentIndex]; - - // Are we on the final line of the script? - if( (currentIndex + 1) == durations.length) { - shouldRun = false; - } else { - if(LOG.isDebugEnabled()) { - LOG.debug("Moving to index " + currentIndex + ": r = " - + readProbs[currentIndex] + ", w = " + writeProbs - + " for duration " + durations[currentIndex]); + if (durations.length == 1) {// There is a fixed run time + while (shouldRun) { + Thread.sleep(2000); + totalTime += 2; + if (totalTime >= durations[0] || stopFileCreated()) { + shouldRun = false; + } + } + } else { + // script run + + while (shouldRun) { + Thread.sleep(durations[currentIndex] * 1000); + totalTime += durations[currentIndex]; + // Are we on the final line of the script? + if ((currentIndex + 1) == durations.length || stopFileCreated()) { + shouldRun = false; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Moving to index " + currentIndex + ": r = " + + readProbs[currentIndex] + ", w = " + writeProbs + + " for duration " + durations[currentIndex]); + } + currentIndex++; } - currentIndex++; } } - } + } if(LOG.isDebugEnabled()) { LOG.debug("Done with testing. Waiting for threads to finish."); } - + boolean failed = false; for (DFSClientThread thread : threads) { thread.join(); for (int i=0; i 0) { + System.err.println("Can't specify elapsedTime and use script."); return -1; - scriptSpecified = true; + } } else if (args[i].equals("-readProbability")) { - if(scriptSpecified) { + if (scriptFile != null) { System.err.println("Can't specify probabilities and use script."); return -1; } @@ -432,7 +512,7 @@ private int init(String[] args) throws IOException { return -1; } } else if (args[i].equals("-writeProbability")) { - if(scriptSpecified) { + if (scriptFile != null) { System.err.println("Can't specify probabilities and use script."); return -1; } @@ -456,14 +536,18 @@ private int init(String[] args) throws IOException { } else if (args[i].equals("-startTime")) { startTime = Long.parseLong(args[++i]); } else if (args[i].equals("-elapsedTime")) { - if(scriptSpecified) { + if (scriptFile != null) { System.err.println("Can't specify elapsedTime and use script."); return -1; } durations[0] = Long.parseLong(args[++i]); } else if (args[i].equals("-seed")) { - r = new Random(Long.parseLong(args[++i])+hostHashCode); - } else { + seed = Long.parseLong(args[++i]); + r = new Random(seed); + } else if (args[i].equals("-flagFile")) { + LOG.info("got flagFile:" + flagFile); + flagFile = new Path(args[++i]); + }else { System.err.println(USAGE); ToolRunner.printGenericCommandUsage(System.err); return -1; @@ -475,6 +559,12 @@ private int init(String[] args) throws IOException { return -1; } + // Load Script File if not MR; for MR scriptFile is loaded by Mapper + if (!runAsMapReduce && scriptFile != null) { + if(loadScriptFile(scriptFile, true) == -1) + return -1; + } + for(int i = 0; i < readProbs.length; i++) { if (readProbs[i] + writeProbs[i] <0 || readProbs[i]+ writeProbs[i] > 1) { System.err.println( @@ -483,12 +573,7 @@ private int init(String[] args) throws IOException { return -1; } } - - if (r==null) { - r = new Random(Time.now()+hostHashCode); - } - - return initFileDirTables(); + return 0; } private static void parseScriptLine(String line, ArrayList duration, @@ -527,9 +612,25 @@ private static void parseScriptLine(String line, ArrayList duration, * @return 0 if successful, -1 if not * @throws IOException if errors with file IO */ - private int loadScriptFile(String filename) throws IOException { - FileReader fr = new FileReader(new File(filename)); - BufferedReader br = new BufferedReader(fr); + protected static int loadScriptFile(String filename, boolean readLocally) throws IOException { + + FileContext fc; + if (readLocally) { // read locally - program is run without MR + fc = FileContext.getLocalFSFileContext(); + } else { + fc = FileContext.getFileContext(); // use default file system + } + DataInputStream in = null; + try { + in = fc.open(new Path(filename)); + } catch (IOException e) { + System.err.println("Unable to open scriptFile: " + filename); + + System.exit(-1); + } + InputStreamReader inr = new InputStreamReader(in); + + BufferedReader br = new BufferedReader(inr); ArrayList duration = new ArrayList(); ArrayList readProb = new ArrayList(); ArrayList writeProb = new ArrayList(); @@ -619,7 +720,7 @@ private void initFileDirTables(Path path) throws IOException { * This allows multiple instances of this program, running on clock * synchronized nodes, to start at roughly the same time. */ - private void barrier() { + private static void barrier() { long sleepTime; while ((sleepTime = startTime - Time.now()) > 0) { try { @@ -635,9 +736,7 @@ private void barrier() { * @throws Exception */ public static void main(String[] args) throws Exception { - int res = ToolRunner.run(new Configuration(), - new LoadGenerator(), args); + int res = ToolRunner.run(new Configuration(), new LoadGenerator(), args); System.exit(res); } - } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGeneratorMR.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGeneratorMR.java new file mode 100644 index 00000000000..c47d9713460 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGeneratorMR.java @@ -0,0 +1,483 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.loadGenerator; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.PrintStream; +import java.net.UnknownHostException; +import java.util.EnumSet; +import java.util.Iterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextOutputFormat; +import org.apache.hadoop.util.ToolRunner; + +/** The load generator is a tool for testing NameNode behavior under + * different client loads. + * The main code is in HadoopCommon, @LoadGenerator. This class, LoadGeneratorMR + * lets you run that LoadGenerator as a MapReduce job. + * + * The synopsis of the command is + * java LoadGeneratorMR + * -mr : results in outputDir/Results + * the rest of the args are the same as the original LoadGenerator. + * + */ +public class LoadGeneratorMR extends LoadGenerator { + public static final Log LOG = LogFactory.getLog(LoadGenerator.class); + private static int numMapTasks = 1; + private String mrOutDir; + + final private static String USAGE_CMD = "java LoadGeneratorMR\n"; + final private static String USAGE = USAGE_CMD + + "-mr [MUST be first 3 args] \n" + USAGE_ARGS ; + + // Constant "keys" used to communicate between map and reduce + final private static Text OPEN_EXECTIME = new Text("OpenExecutionTime"); + final private static Text NUMOPS_OPEN = new Text("NumOpsOpen"); + final private static Text LIST_EXECTIME = new Text("ListExecutionTime"); + final private static Text NUMOPS_LIST = new Text("NumOpsList"); + final private static Text DELETE_EXECTIME = new Text("DeletionExecutionTime"); + final private static Text NUMOPS_DELETE = new Text("NumOpsDelete"); + final private static Text CREATE_EXECTIME = new Text("CreateExecutionTime"); + final private static Text NUMOPS_CREATE = new Text("NumOpsCreate"); + final private static Text WRITE_CLOSE_EXECTIME = new Text("WriteCloseExecutionTime"); + final private static Text NUMOPS_WRITE_CLOSE = new Text("NumOpsWriteClose"); + final private static Text ELAPSED_TIME = new Text("ElapsedTime"); + final private static Text TOTALOPS = new Text("TotalOps"); + + // Config keys to pass args from Main to the Job + final private static String LG_ROOT = "LG.root"; + final private static String LG_SCRIPTFILE = "LG.scriptFile"; + final private static String LG_MAXDELAYBETWEENOPS = "LG.maxDelayBetweenOps"; + final private static String LG_NUMOFTHREADS = "LG.numOfThreads"; + final private static String LG_READPR = "LG.readPr"; + final private static String LG_WRITEPR = "LG.writePr"; + final private static String LG_SEED = "LG.r"; + final private static String LG_NUMMAPTASKS = "LG.numMapTasks"; + final private static String LG_ELAPSEDTIME = "LG.elapsedTime"; + final private static String LG_STARTTIME = "LG.startTime"; + final private static String LG_FLAGFILE = "LG.flagFile"; + + + /** Constructor */ + public LoadGeneratorMR() throws IOException, UnknownHostException { + super(); + } + + public LoadGeneratorMR(Configuration conf) throws IOException, UnknownHostException { + this(); + setConf(conf); + } + + /** Main function called by tool runner. + * It first initializes data by parsing the command line arguments. + * It then calls the loadGenerator + */ + @Override + public int run(String[] args) throws Exception { + int exitCode = parseArgsMR(args); + if (exitCode != 0) { + return exitCode; + } + System.out.println("Running LoadGeneratorMR against fileSystem: " + + FileContext.getFileContext().getDefaultFileSystem().getUri()); + + return submitAsMapReduce(); // reducer will print the results + } + + + /** + * Parse the command line arguments and initialize the data. + * Only parse the first arg: -mr (MUST be first three Args) + * The rest are parsed by the Parent LoadGenerator + **/ + + private int parseArgsMR(String[] args) throws IOException { + try { + if (args.length >= 3 && args[0].equals("-mr")) { + numMapTasks = Integer.parseInt(args[1]); + mrOutDir = args[2]; + if (mrOutDir.startsWith("-")) { + System.err.println("Missing output file parameter, instead got: " + + mrOutDir); + System.err.println(USAGE); + return -1; + } + } else { + System.err.println(USAGE); + ToolRunner.printGenericCommandUsage(System.err); + return -1; + } + String[] strippedArgs = new String[args.length - 3]; + for (int i = 0; i < strippedArgs.length; i++) { + strippedArgs[i] = args[i + 3]; + } + super.parseArgs(true, strippedArgs); // Parse normal LoadGenerator args + } catch (NumberFormatException e) { + System.err.println("Illegal parameter: " + e.getLocalizedMessage()); + System.err.println(USAGE); + return -1; + } + return 0; + } + + /** Main program + * + * @param args command line arguments + * @throws Exception + */ + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new LoadGeneratorMR(), args); + System.exit(res); + } + + + // The following methods are only used when LoadGenerator is run a MR job + /** + * Based on args we submit the LoadGenerator as MR job. + * Number of MapTasks is numMapTasks + * @return exitCode for job submission + */ + private int submitAsMapReduce() { + + System.out.println("Running as a MapReduce job with " + + numMapTasks + " mapTasks; Output to file " + mrOutDir); + + + Configuration conf = new Configuration(getConf()); + + // First set all the args of LoadGenerator as Conf vars to pass to MR tasks + + conf.set(LG_ROOT , root.toString()); + conf.setInt(LG_MAXDELAYBETWEENOPS, maxDelayBetweenOps); + conf.setInt(LG_NUMOFTHREADS, numOfThreads); + conf.set(LG_READPR, readProbs[0]+""); //Pass Double as string + conf.set(LG_WRITEPR, writeProbs[0]+""); //Pass Double as string + conf.setLong(LG_SEED, seed); //No idea what this is + conf.setInt(LG_NUMMAPTASKS, numMapTasks); + if (scriptFile == null && durations[0] <=0) { + System.err.println("When run as a MapReduce job, elapsed Time or ScriptFile must be specified"); + System.exit(-1); + } + conf.setLong(LG_ELAPSEDTIME, durations[0]); + conf.setLong(LG_STARTTIME, startTime); + if (scriptFile != null) { + conf.set(LG_SCRIPTFILE , scriptFile); + } + conf.set(LG_FLAGFILE, flagFile.toString()); + + // Now set the necessary conf variables that apply to run MR itself. + JobConf jobConf = new JobConf(conf, LoadGenerator.class); + jobConf.setJobName("NNLoadGeneratorViaMR"); + jobConf.setNumMapTasks(numMapTasks); + jobConf.setNumReduceTasks(1); // 1 reducer to collect the results + + jobConf.setOutputKeyClass(Text.class); + jobConf.setOutputValueClass(IntWritable.class); + + jobConf.setMapperClass(MapperThatRunsNNLoadGenerator.class); + jobConf.setReducerClass(ReducerThatCollectsLGdata.class); + + jobConf.setInputFormat(DummyInputFormat.class); + jobConf.setOutputFormat(TextOutputFormat.class); + + // Explicitly set number of max map attempts to 1. + jobConf.setMaxMapAttempts(1); + // Explicitly turn off speculative execution + jobConf.setSpeculativeExecution(false); + + // This mapReduce job has no input but has output + FileOutputFormat.setOutputPath(jobConf, new Path(mrOutDir)); + + try { + JobClient.runJob(jobConf); + } catch (IOException e) { + System.err.println("Failed to run job: " + e.getMessage()); + return -1; + } + return 0; + + } + + + // Each split is empty + public static class EmptySplit implements InputSplit { + public void write(DataOutput out) throws IOException {} + public void readFields(DataInput in) throws IOException {} + public long getLength() {return 0L;} + public String[] getLocations() {return new String[0];} + } + + // Dummy Input format to send 1 record - number of spits is numMapTasks + public static class DummyInputFormat extends Configured implements + InputFormat { + + public InputSplit[] getSplits(JobConf conf, int numSplits) { + numSplits = conf.getInt("LG.numMapTasks", 1); + InputSplit[] ret = new InputSplit[numSplits]; + for (int i = 0; i < numSplits; ++i) { + ret[i] = new EmptySplit(); + } + return ret; + } + + public RecordReader getRecordReader( + InputSplit ignored, JobConf conf, Reporter reporter) throws IOException { + + return new RecordReader() { + + boolean sentOneRecord = false; + + public boolean next(LongWritable key, Text value) + throws IOException { + key.set(1); + value.set("dummy"); + if (sentOneRecord == false) { // first call + sentOneRecord = true; + return true; + } + return false; // we have sent one record - we are done + } + + public LongWritable createKey() { + return new LongWritable(); + } + public Text createValue() { + return new Text(); + } + public long getPos() throws IOException { + return 1; + } + public void close() throws IOException { + } + public float getProgress() throws IOException { + return 1; + } + }; + } + } + + public static class MapperThatRunsNNLoadGenerator extends MapReduceBase + implements Mapper { + + private JobConf jobConf; + + @Override + public void configure(JobConf job) { + this.jobConf = job; + getArgsFromConfiguration(jobConf); + } + + private class ProgressThread extends Thread { + + boolean keepGoing; // while this is true, thread runs. + private Reporter reporter; + + public ProgressThread(final Reporter r) { + this.reporter = r; + this.keepGoing = true; + } + + public void run() { + while (keepGoing) { + if (!ProgressThread.interrupted()) { + try { + sleep(30 * 1000); + } catch (InterruptedException e) { + } + } + reporter.progress(); + } + } + } + + public void map(LongWritable key, Text value, + OutputCollector output, Reporter reporter) + throws IOException { + ProgressThread progressThread = new ProgressThread(reporter); + progressThread.start(); + try { + new LoadGenerator(jobConf).generateLoadOnNN(); + System.out + .println("Finished generating load on NN, sending results to the reducer"); + printResults(System.out); + progressThread.keepGoing = false; + progressThread.join(); + + // Send results to Reducer + output.collect(OPEN_EXECTIME, + new IntWritable((int) executionTime[OPEN])); + output.collect(NUMOPS_OPEN, new IntWritable((int) numOfOps[OPEN])); + + output.collect(LIST_EXECTIME, + new IntWritable((int) executionTime[LIST])); + output.collect(NUMOPS_LIST, new IntWritable((int) numOfOps[LIST])); + + output.collect(DELETE_EXECTIME, new IntWritable( + (int) executionTime[DELETE])); + output.collect(NUMOPS_DELETE, new IntWritable((int) numOfOps[DELETE])); + + output.collect(CREATE_EXECTIME, new IntWritable( + (int) executionTime[CREATE])); + output.collect(NUMOPS_CREATE, new IntWritable((int) numOfOps[CREATE])); + + output.collect(WRITE_CLOSE_EXECTIME, new IntWritable( + (int) executionTime[WRITE_CLOSE])); + output.collect(NUMOPS_WRITE_CLOSE, new IntWritable( + (int) numOfOps[WRITE_CLOSE])); + + output.collect(TOTALOPS, new IntWritable((int) totalOps)); + output.collect(ELAPSED_TIME, new IntWritable((int) totalTime)); + + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public void getArgsFromConfiguration(Configuration conf) { + + maxDelayBetweenOps = conf.getInt(LG_MAXDELAYBETWEENOPS, + maxDelayBetweenOps); + numOfThreads = conf.getInt(LG_NUMOFTHREADS, numOfThreads); + readProbs[0] = Double.parseDouble(conf.get(LG_READPR, readProbs[0] + "")); + writeProbs[0] = Double.parseDouble(conf.get(LG_WRITEPR, writeProbs[0] + + "")); + seed = conf.getLong(LG_SEED, seed); + numMapTasks = conf.getInt(LG_NUMMAPTASKS, numMapTasks); + root = new Path(conf.get(LG_ROOT, root.toString())); + durations[0] = conf.getLong(LG_ELAPSEDTIME, 0); + startTime = conf.getLong(LG_STARTTIME, 0); + scriptFile = conf.get(LG_SCRIPTFILE, null); + flagFile = new Path(conf.get(LG_FLAGFILE, FLAGFILE_DEFAULT)); + if (durations[0] > 0 && scriptFile != null) { + System.err.println("Cannot specify both ElapsedTime and ScriptFile, exiting"); + System.exit(-1); + } + + try { + if (scriptFile != null && loadScriptFile(scriptFile, false) < 0) { + System.err.println("Error in scriptFile, exiting"); + System.exit(-1); + } + } catch (IOException e) { + System.err.println("Error loading script file " + scriptFile); + e.printStackTrace(); + } + if (durations[0] <= 0) { + System.err.println("A duration of zero or less is not allowed when running via MapReduce."); + System.exit(-1); + } + } + } + + public static class ReducerThatCollectsLGdata extends MapReduceBase implements + Reducer { + private IntWritable result = new IntWritable(); + private JobConf jobConf; + + @Override + public void configure(JobConf job) { + this.jobConf = job; + } + + @Override + public void reduce(Text key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + int sum = 0; + while (values.hasNext()) { + sum += values.next().get(); + } + if (key.equals(OPEN_EXECTIME)){ + executionTime[OPEN] = sum; + } else if (key.equals(NUMOPS_OPEN)){ + numOfOps[OPEN] = sum; + } else if (key.equals(LIST_EXECTIME)){ + executionTime[LIST] = sum; + } else if (key.equals(NUMOPS_LIST)){ + numOfOps[LIST] = sum; + } else if (key.equals(DELETE_EXECTIME)){ + executionTime[DELETE] = sum; + } else if (key.equals(NUMOPS_DELETE)){ + numOfOps[DELETE] = sum; + } else if (key.equals(CREATE_EXECTIME)){ + executionTime[CREATE] = sum; + } else if (key.equals(NUMOPS_CREATE)){ + numOfOps[CREATE] = sum; + } else if (key.equals(WRITE_CLOSE_EXECTIME)){ + System.out.println(WRITE_CLOSE_EXECTIME + " = " + sum); + executionTime[WRITE_CLOSE]= sum; + } else if (key.equals(NUMOPS_WRITE_CLOSE)){ + numOfOps[WRITE_CLOSE] = sum; + } else if (key.equals(TOTALOPS)){ + totalOps = sum; + } else if (key.equals(ELAPSED_TIME)){ + totalTime = sum; + } + result.set(sum); + output.collect(key, result); + // System.out.println("Key = " + key + " Sum is =" + sum); + // printResults(System.out); + } + + @Override + public void close() throws IOException { + // Output the result to a file Results in the output dir + FileContext fc; + try { + fc = FileContext.getFileContext(jobConf); + } catch (IOException ioe) { + System.err.println("Can not initialize the file system: " + + ioe.getLocalizedMessage()); + return; + } + FSDataOutputStream o = fc.create(FileOutputFormat.getTaskOutputPath(jobConf, "Results"), + EnumSet.of(CreateFlag.CREATE)); + + PrintStream out = new PrintStream(o); + printResults(out); + out.close(); + o.close(); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java index f2cd53cdfa2..b1dfe56dbf8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java @@ -41,6 +41,10 @@ import org.apache.hadoop.fs.DistributedFSCheck; import org.apache.hadoop.io.FileBench; import org.apache.hadoop.fs.JHLogAnalyzer; +import org.apache.hadoop.fs.loadGenerator.DataGenerator; +import org.apache.hadoop.fs.loadGenerator.LoadGenerator; +import org.apache.hadoop.fs.loadGenerator.LoadGeneratorMR; +import org.apache.hadoop.fs.loadGenerator.StructureGenerator; import org.apache.hadoop.fs.slive.SliveTest; /** @@ -107,6 +111,14 @@ public MapredTestDriver(ProgramDriver pgd) { "Single process HDFS and MR cluster."); pgd.addClass("largesorter", LargeSorter.class, "Large-Sort tester"); + pgd.addClass("NNloadGenerator", LoadGenerator.class, + "Generate load on Namenode using NN loadgenerator run WITHOUT MR"); + pgd.addClass("NNloadGeneratorMR", LoadGeneratorMR.class, + "Generate load on Namenode using NN loadgenerator run as MR job"); + pgd.addClass("NNstructureGenerator", StructureGenerator.class, + "Generate the structure to be used by NNdataGenerator"); + pgd.addClass("NNdataGenerator", DataGenerator.class, + "Generate the data to be used by NNloadGenerator"); } catch(Throwable e) { e.printStackTrace(); }