HADOOP-9992. Modify the NN loadGenerator to optionally run as a MapReduce job. Contributed by Akshay Radia

(cherry picked from commit 09d31bc630)
This commit is contained in:
Brandon Li 2015-01-09 17:24:22 -08:00
parent 9a7c763b3f
commit d847363821
4 changed files with 720 additions and 123 deletions

View File

@ -109,6 +109,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11464. Reinstate support for launching Hadoop processes on Windows HADOOP-11464. Reinstate support for launching Hadoop processes on Windows
using Cygwin. (cnauroth) using Cygwin. (cnauroth)
HADOOP-9992. Modify the NN loadGenerator to optionally run as a MapReduce job
(Akshay Radia via brandonli)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-11323. WritableComparator#compare keeps reference to byte array. HADOOP-11323. WritableComparator#compare keeps reference to byte array.

View File

@ -19,10 +19,12 @@
package org.apache.hadoop.fs.loadGenerator; package org.apache.hadoop.fs.loadGenerator;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.File; import java.io.DataInputStream;
import java.io.FileReader; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
@ -36,10 +38,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
@ -48,8 +51,11 @@ import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
/** The load generator is a tool for testing NameNode behavior under /** The load generator is a tool for testing NameNode behavior under
* different client loads. * different client loads. Note there is a subclass of this clas that lets
* It allows the user to generate different mixes of read, write, * you run a the load generator as a MapReduce job (see LoadGeneratorMR in the
* MapReduce project.
*
* The loadGenerator allows the user to generate different mixes of read, write,
* and list requests by specifying the probabilities of read and * and list requests by specifying the probabilities of read and
* write. The user controls the intensity of the load by * write. The user controls the intensity of the load by
* adjusting parameters for the number of worker threads and the delay * adjusting parameters for the number of worker threads and the delay
@ -59,14 +65,23 @@ import com.google.common.base.Preconditions;
* execution time of each kind of operations and the NameNode * execution time of each kind of operations and the NameNode
* throughput. * throughput.
* *
* The program can run in one of two forms. As a regular single process command
* that runs multiple threads to generate load on the NN or as a Map Reduce
* program that runs multiple (multi-threaded) map tasks that generate load
* on the NN; the results summary is generated by a single reduce task.
*
*
* The user may either specify constant duration, read and write * The user may either specify constant duration, read and write
* probabilities via the command line, or may specify a text file * probabilities via the command line, or may specify a text file
* that acts as a script of which read and write probabilities to * that acts as a script of which read and write probabilities to
* use for specified durations. * use for specified durations. If no duration is specified the program
* runs till killed (duration required if run as MapReduce).
* *
* The script takes the form of lines of duration in seconds, read * The script takes the form of lines of duration in seconds, read
* probability and write probability, each separated by white space. * probability and write probability, each separated by white space.
* Blank lines and lines starting with # (comments) are ignored. * Blank lines and lines starting with # (comments) are ignored. If load
* generator is run as a MapReduce program then the script file needs to be
* accessible on the the Map task as a HDFS file.
* *
* After command line argument parsing and data initialization, * After command line argument parsing and data initialization,
* the load generator spawns the number of worker threads * the load generator spawns the number of worker threads
@ -116,31 +131,43 @@ import com.google.common.base.Preconditions;
public class LoadGenerator extends Configured implements Tool { public class LoadGenerator extends Configured implements Tool {
public static final Log LOG = LogFactory.getLog(LoadGenerator.class); public static final Log LOG = LogFactory.getLog(LoadGenerator.class);
private volatile boolean shouldRun = true; private volatile static boolean shouldRun = true;
private Path root = DataGenerator.DEFAULT_ROOT; protected static Path root = DataGenerator.DEFAULT_ROOT;
private FileContext fc; private static FileContext fc;
private int maxDelayBetweenOps = 0; protected static int maxDelayBetweenOps = 0;
private int numOfThreads = 200; protected static int numOfThreads = 200;
private long [] durations = {0}; protected static long [] durations = {0};
private double [] readProbs = {0.3333}; protected static double [] readProbs = {0.3333};
private double [] writeProbs = {0.3333}; protected static double [] writeProbs = {0.3333};
private volatile int currentIndex = 0; private static volatile int currentIndex = 0;
long totalTime = 0; protected static long totalTime = 0;
private long startTime = Time.now()+10000; protected static long startTime = Time.now()+10000;
final static private int BLOCK_SIZE = 10; final static private int BLOCK_SIZE = 10;
private ArrayList<String> files = new ArrayList<String>(); // a table of file names private static ArrayList<String> files = new ArrayList<String>(); // a table of file names
private ArrayList<String> dirs = new ArrayList<String>(); // a table of directory names private static ArrayList<String> dirs = new ArrayList<String>(); // a table of directory names
private Random r = null; protected static Random r = null;
final private static String USAGE = "java LoadGenerator\n" + protected static long seed = 0;
"-readProbability <read probability>\n" + protected static String scriptFile = null;
"-writeProbability <write probability>\n" + protected static final String FLAGFILE_DEFAULT = "/tmp/flagFile";
"-root <root>\n" + protected static Path flagFile = new Path(FLAGFILE_DEFAULT);
"-maxDelayBetweenOps <maxDelayBetweenOpsInMillis>\n" + protected String hostname;
"-numOfThreads <numOfThreads>\n" + final private static String USAGE_CMD = "java LoadGenerator\n";
"-elapsedTime <elapsedTimeInSecs>\n" + final protected static String USAGE_ARGS =
"-startTime <startTimeInMillis>\n" + "-readProbability <read probability>\n" +
"-scriptFile <filename>"; "-writeProbability <write probability>\n" +
final private String hostname; "-root <root>\n" +
"-maxDelayBetweenOps <maxDelayBetweenOpsInMillis>\n" +
"-numOfThreads <numOfThreads>\n" +
"-elapsedTime <elapsedTimeInSecs>\n" +
"-startTime <startTimeInMillis>\n" +
"-scriptFile <filename>\n" +
"-flagFile <filename>";
final private static String USAGE = USAGE_CMD + USAGE_ARGS;
private final byte[] WRITE_CONTENTS = new byte[4096]; private final byte[] WRITE_CONTENTS = new byte[4096];
private static final int ERR_TEST_FAILED = 2; private static final int ERR_TEST_FAILED = 2;
@ -152,14 +179,20 @@ public class LoadGenerator extends Configured implements Tool {
Arrays.fill(WRITE_CONTENTS, (byte) 'a'); Arrays.fill(WRITE_CONTENTS, (byte) 'a');
} }
private final static int OPEN = 0; public LoadGenerator(Configuration conf) throws IOException, UnknownHostException {
private final static int LIST = 1; this();
private final static int CREATE = 2; setConf(conf);
private final static int WRITE_CLOSE = 3; }
private final static int DELETE = 4;
private final static int TOTAL_OP_TYPES =5; protected final static int OPEN = 0;
private long [] executionTime = new long[TOTAL_OP_TYPES]; protected final static int LIST = 1;
private long [] totalNumOfOps = new long[TOTAL_OP_TYPES]; protected final static int CREATE = 2;
protected final static int WRITE_CLOSE = 3;
protected final static int DELETE = 4;
protected final static int TOTAL_OP_TYPES =5;
protected static long [] executionTime = new long[TOTAL_OP_TYPES];
protected static long [] numOfOps = new long[TOTAL_OP_TYPES];
protected static long totalOps = 0; // across all of types
/** A thread sends a stream of requests to the NameNode. /** A thread sends a stream of requests to the NameNode.
* At each iteration, it first decides if it is going to read a file, * At each iteration, it first decides if it is going to read a file,
@ -192,7 +225,7 @@ public class LoadGenerator extends Configured implements Tool {
this.id = id; this.id = id;
} }
/** Main loop /** Main loop for each thread
* Each iteration decides what's the next operation and then pauses. * Each iteration decides what's the next operation and then pauses.
*/ */
@Override @Override
@ -295,7 +328,7 @@ public class LoadGenerator extends Configured implements Tool {
CreateOpts.createParent(), CreateOpts.bufferSize(4096), CreateOpts.createParent(), CreateOpts.bufferSize(4096),
CreateOpts.repFac((short) 3)); CreateOpts.repFac((short) 3));
executionTime[CREATE] += (Time.now() - startTime); executionTime[CREATE] += (Time.now() - startTime);
totalNumOfOps[CREATE]++; numOfOps[CREATE]++;
long i = fileSize; long i = fileSize;
while (i > 0) { while (i > 0) {
@ -306,28 +339,67 @@ public class LoadGenerator extends Configured implements Tool {
startTime = Time.now(); startTime = Time.now();
executionTime[WRITE_CLOSE] += (Time.now() - startTime); executionTime[WRITE_CLOSE] += (Time.now() - startTime);
totalNumOfOps[WRITE_CLOSE]++; numOfOps[WRITE_CLOSE]++;
} finally { } finally {
IOUtils.cleanup(LOG, out); IOUtils.cleanup(LOG, out);
} }
} }
} }
/** Main function: /** Main function called by tool runner.
* It first initializes data by parsing the command line arguments. * It first initializes data by parsing the command line arguments.
* It then starts the number of DFSClient threads as specified by * It then calls the loadGenerator
* the user.
* It stops all the threads when the specified elapsed time is passed.
* Before exiting, it prints the average execution for
* each operation and operation throughput.
*/ */
@Override @Override
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
int exitCode = init(args); int exitCode = parseArgs(false, args);
if (exitCode != 0) { if (exitCode != 0) {
return exitCode; return exitCode;
} }
System.out.println("Running LoadGenerator against fileSystem: " +
FileContext.getFileContext().getDefaultFileSystem().getUri());
exitCode = generateLoadOnNN();
printResults(System.out);
return exitCode;
}
boolean stopFileCreated() {
try {
fc.getFileStatus(flagFile);
} catch (FileNotFoundException e) {
return false;
} catch (IOException e) {
LOG.error("Got error when checking if file exists:" + flagFile, e);
}
LOG.info("Flag file was created. Stopping the test.");
return true;
}
/**
* This is the main function - run threads to generate load on NN
* It starts the number of DFSClient threads as specified by
* the user.
* It stops all the threads when the specified elapsed time is passed.
*/
protected int generateLoadOnNN() throws InterruptedException {
int hostHashCode = hostname.hashCode();
if (seed == 0) {
r = new Random(System.currentTimeMillis()+hostHashCode);
} else {
r = new Random(seed+hostHashCode);
}
try {
fc = FileContext.getFileContext(getConf());
} catch (IOException ioe) {
System.err.println("Can not initialize the file system: " +
ioe.getLocalizedMessage());
return -1;
}
int status = initFileDirTables();
if (status != 0) {
return status;
}
barrier(); barrier();
DFSClientThread[] threads = new DFSClientThread[numOfThreads]; DFSClientThread[] threads = new DFSClientThread[numOfThreads];
@ -337,20 +409,31 @@ public class LoadGenerator extends Configured implements Tool {
} }
if (durations[0] > 0) { if (durations[0] > 0) {
while(shouldRun) { if (durations.length == 1) {// There is a fixed run time
Thread.sleep(durations[currentIndex] * 1000); while (shouldRun) {
totalTime += durations[currentIndex]; Thread.sleep(2000);
totalTime += 2;
// Are we on the final line of the script? if (totalTime >= durations[0] || stopFileCreated()) {
if( (currentIndex + 1) == durations.length) { shouldRun = false;
shouldRun = false; }
} else { }
if(LOG.isDebugEnabled()) { } else {
LOG.debug("Moving to index " + currentIndex + ": r = " // script run
+ readProbs[currentIndex] + ", w = " + writeProbs
+ " for duration " + durations[currentIndex]); while (shouldRun) {
Thread.sleep(durations[currentIndex] * 1000);
totalTime += durations[currentIndex];
// Are we on the final line of the script?
if ((currentIndex + 1) == durations.length || stopFileCreated()) {
shouldRun = false;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Moving to index " + currentIndex + ": r = "
+ readProbs[currentIndex] + ", w = " + writeProbs
+ " for duration " + durations[currentIndex]);
}
currentIndex++;
} }
currentIndex++;
} }
} }
} }
@ -364,64 +447,61 @@ public class LoadGenerator extends Configured implements Tool {
thread.join(); thread.join();
for (int i=0; i<TOTAL_OP_TYPES; i++) { for (int i=0; i<TOTAL_OP_TYPES; i++) {
executionTime[i] += thread.executionTime[i]; executionTime[i] += thread.executionTime[i];
totalNumOfOps[i] += thread.totalNumOfOps[i]; numOfOps[i] += thread.totalNumOfOps[i];
} }
failed = failed || thread.failed; failed = failed || thread.failed;
} }
int exitCode = 0;
if (failed) { if (failed) {
exitCode = -ERR_TEST_FAILED; exitCode = -ERR_TEST_FAILED;
} }
long totalOps = 0; totalOps = 0;
for (int i=0; i<TOTAL_OP_TYPES; i++) { for (int i=0; i<TOTAL_OP_TYPES; i++) {
totalOps += totalNumOfOps[i]; totalOps += numOfOps[i];
} }
if (totalNumOfOps[OPEN] != 0) {
System.out.println("Average open execution time: " +
(double)executionTime[OPEN]/totalNumOfOps[OPEN] + "ms");
}
if (totalNumOfOps[LIST] != 0) {
System.out.println("Average list execution time: " +
(double)executionTime[LIST]/totalNumOfOps[LIST] + "ms");
}
if (totalNumOfOps[DELETE] != 0) {
System.out.println("Average deletion execution time: " +
(double)executionTime[DELETE]/totalNumOfOps[DELETE] + "ms");
System.out.println("Average create execution time: " +
(double)executionTime[CREATE]/totalNumOfOps[CREATE] + "ms");
System.out.println("Average write_close execution time: " +
(double)executionTime[WRITE_CLOSE]/totalNumOfOps[WRITE_CLOSE] + "ms");
}
if (durations[0] != 0) {
System.out.println("Average operations per second: " +
(double)totalOps/totalTime +"ops/s");
}
System.out.println();
return exitCode; return exitCode;
} }
/** Parse the command line arguments and initialize the data */ protected static void printResults(PrintStream out) throws UnsupportedFileSystemException {
private int init(String[] args) throws IOException { out.println("Result of running LoadGenerator against fileSystem: " +
try { FileContext.getFileContext().getDefaultFileSystem().getUri());
fc = FileContext.getFileContext(getConf()); if (numOfOps[OPEN] != 0) {
} catch (IOException ioe) { out.println("Average open execution time: " +
System.err.println("Can not initialize the file system: " + (double)executionTime[OPEN]/numOfOps[OPEN] + "ms");
ioe.getLocalizedMessage());
return -1;
} }
int hostHashCode = hostname.hashCode(); if (numOfOps[LIST] != 0) {
boolean scriptSpecified = false; out.println("Average list execution time: " +
(double)executionTime[LIST]/numOfOps[LIST] + "ms");
}
if (numOfOps[DELETE] != 0) {
out.println("Average deletion execution time: " +
(double)executionTime[DELETE]/numOfOps[DELETE] + "ms");
out.println("Average create execution time: " +
(double)executionTime[CREATE]/numOfOps[CREATE] + "ms");
out.println("Average write_close execution time: " +
(double)executionTime[WRITE_CLOSE]/numOfOps[WRITE_CLOSE] + "ms");
}
if (totalTime != 0) {
out.println("Average operations per second: " +
(double)totalOps/totalTime +"ops/s");
}
out.println();
}
try {
/** Parse the command line arguments and initialize the data */
protected int parseArgs(boolean runAsMapReduce, String[] args) throws IOException {
try {
for (int i = 0; i < args.length; i++) { // parse command line for (int i = 0; i < args.length; i++) { // parse command line
if (args[i].equals("-scriptFile")) { if (args[i].equals("-scriptFile")) {
if(loadScriptFile(args[++i]) == -1) scriptFile = args[++i];
if (durations[0] > 0) {
System.err.println("Can't specify elapsedTime and use script.");
return -1; return -1;
scriptSpecified = true; }
} else if (args[i].equals("-readProbability")) { } else if (args[i].equals("-readProbability")) {
if(scriptSpecified) { if (scriptFile != null) {
System.err.println("Can't specify probabilities and use script."); System.err.println("Can't specify probabilities and use script.");
return -1; return -1;
} }
@ -432,7 +512,7 @@ public class LoadGenerator extends Configured implements Tool {
return -1; return -1;
} }
} else if (args[i].equals("-writeProbability")) { } else if (args[i].equals("-writeProbability")) {
if(scriptSpecified) { if (scriptFile != null) {
System.err.println("Can't specify probabilities and use script."); System.err.println("Can't specify probabilities and use script.");
return -1; return -1;
} }
@ -456,14 +536,18 @@ public class LoadGenerator extends Configured implements Tool {
} else if (args[i].equals("-startTime")) { } else if (args[i].equals("-startTime")) {
startTime = Long.parseLong(args[++i]); startTime = Long.parseLong(args[++i]);
} else if (args[i].equals("-elapsedTime")) { } else if (args[i].equals("-elapsedTime")) {
if(scriptSpecified) { if (scriptFile != null) {
System.err.println("Can't specify elapsedTime and use script."); System.err.println("Can't specify elapsedTime and use script.");
return -1; return -1;
} }
durations[0] = Long.parseLong(args[++i]); durations[0] = Long.parseLong(args[++i]);
} else if (args[i].equals("-seed")) { } else if (args[i].equals("-seed")) {
r = new Random(Long.parseLong(args[++i])+hostHashCode); seed = Long.parseLong(args[++i]);
} else { r = new Random(seed);
} else if (args[i].equals("-flagFile")) {
LOG.info("got flagFile:" + flagFile);
flagFile = new Path(args[++i]);
}else {
System.err.println(USAGE); System.err.println(USAGE);
ToolRunner.printGenericCommandUsage(System.err); ToolRunner.printGenericCommandUsage(System.err);
return -1; return -1;
@ -475,6 +559,12 @@ public class LoadGenerator extends Configured implements Tool {
return -1; return -1;
} }
// Load Script File if not MR; for MR scriptFile is loaded by Mapper
if (!runAsMapReduce && scriptFile != null) {
if(loadScriptFile(scriptFile, true) == -1)
return -1;
}
for(int i = 0; i < readProbs.length; i++) { for(int i = 0; i < readProbs.length; i++) {
if (readProbs[i] + writeProbs[i] <0 || readProbs[i]+ writeProbs[i] > 1) { if (readProbs[i] + writeProbs[i] <0 || readProbs[i]+ writeProbs[i] > 1) {
System.err.println( System.err.println(
@ -483,12 +573,7 @@ public class LoadGenerator extends Configured implements Tool {
return -1; return -1;
} }
} }
return 0;
if (r==null) {
r = new Random(Time.now()+hostHashCode);
}
return initFileDirTables();
} }
private static void parseScriptLine(String line, ArrayList<Long> duration, private static void parseScriptLine(String line, ArrayList<Long> duration,
@ -527,9 +612,25 @@ public class LoadGenerator extends Configured implements Tool {
* @return 0 if successful, -1 if not * @return 0 if successful, -1 if not
* @throws IOException if errors with file IO * @throws IOException if errors with file IO
*/ */
private int loadScriptFile(String filename) throws IOException { protected static int loadScriptFile(String filename, boolean readLocally) throws IOException {
FileReader fr = new FileReader(new File(filename));
BufferedReader br = new BufferedReader(fr); FileContext fc;
if (readLocally) { // read locally - program is run without MR
fc = FileContext.getLocalFSFileContext();
} else {
fc = FileContext.getFileContext(); // use default file system
}
DataInputStream in = null;
try {
in = fc.open(new Path(filename));
} catch (IOException e) {
System.err.println("Unable to open scriptFile: " + filename);
System.exit(-1);
}
InputStreamReader inr = new InputStreamReader(in);
BufferedReader br = new BufferedReader(inr);
ArrayList<Long> duration = new ArrayList<Long>(); ArrayList<Long> duration = new ArrayList<Long>();
ArrayList<Double> readProb = new ArrayList<Double>(); ArrayList<Double> readProb = new ArrayList<Double>();
ArrayList<Double> writeProb = new ArrayList<Double>(); ArrayList<Double> writeProb = new ArrayList<Double>();
@ -619,7 +720,7 @@ public class LoadGenerator extends Configured implements Tool {
* This allows multiple instances of this program, running on clock * This allows multiple instances of this program, running on clock
* synchronized nodes, to start at roughly the same time. * synchronized nodes, to start at roughly the same time.
*/ */
private void barrier() { private static void barrier() {
long sleepTime; long sleepTime;
while ((sleepTime = startTime - Time.now()) > 0) { while ((sleepTime = startTime - Time.now()) > 0) {
try { try {
@ -635,9 +736,7 @@ public class LoadGenerator extends Configured implements Tool {
* @throws Exception * @throws Exception
*/ */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), int res = ToolRunner.run(new Configuration(), new LoadGenerator(), args);
new LoadGenerator(), args);
System.exit(res); System.exit(res);
} }
} }

View File

@ -0,0 +1,483 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.loadGenerator;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.PrintStream;
import java.net.UnknownHostException;
import java.util.EnumSet;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
/** The load generator is a tool for testing NameNode behavior under
* different client loads.
* The main code is in HadoopCommon, @LoadGenerator. This class, LoadGeneratorMR
* lets you run that LoadGenerator as a MapReduce job.
*
* The synopsis of the command is
* java LoadGeneratorMR
* -mr <numMapJobs> <outputDir> : results in outputDir/Results
* the rest of the args are the same as the original LoadGenerator.
*
*/
public class LoadGeneratorMR extends LoadGenerator {
public static final Log LOG = LogFactory.getLog(LoadGenerator.class);
private static int numMapTasks = 1;
private String mrOutDir;
final private static String USAGE_CMD = "java LoadGeneratorMR\n";
final private static String USAGE = USAGE_CMD
+ "-mr <numMapJobs> <outputDir> [MUST be first 3 args] \n" + USAGE_ARGS ;
// Constant "keys" used to communicate between map and reduce
final private static Text OPEN_EXECTIME = new Text("OpenExecutionTime");
final private static Text NUMOPS_OPEN = new Text("NumOpsOpen");
final private static Text LIST_EXECTIME = new Text("ListExecutionTime");
final private static Text NUMOPS_LIST = new Text("NumOpsList");
final private static Text DELETE_EXECTIME = new Text("DeletionExecutionTime");
final private static Text NUMOPS_DELETE = new Text("NumOpsDelete");
final private static Text CREATE_EXECTIME = new Text("CreateExecutionTime");
final private static Text NUMOPS_CREATE = new Text("NumOpsCreate");
final private static Text WRITE_CLOSE_EXECTIME = new Text("WriteCloseExecutionTime");
final private static Text NUMOPS_WRITE_CLOSE = new Text("NumOpsWriteClose");
final private static Text ELAPSED_TIME = new Text("ElapsedTime");
final private static Text TOTALOPS = new Text("TotalOps");
// Config keys to pass args from Main to the Job
final private static String LG_ROOT = "LG.root";
final private static String LG_SCRIPTFILE = "LG.scriptFile";
final private static String LG_MAXDELAYBETWEENOPS = "LG.maxDelayBetweenOps";
final private static String LG_NUMOFTHREADS = "LG.numOfThreads";
final private static String LG_READPR = "LG.readPr";
final private static String LG_WRITEPR = "LG.writePr";
final private static String LG_SEED = "LG.r";
final private static String LG_NUMMAPTASKS = "LG.numMapTasks";
final private static String LG_ELAPSEDTIME = "LG.elapsedTime";
final private static String LG_STARTTIME = "LG.startTime";
final private static String LG_FLAGFILE = "LG.flagFile";
/** Constructor */
public LoadGeneratorMR() throws IOException, UnknownHostException {
super();
}
public LoadGeneratorMR(Configuration conf) throws IOException, UnknownHostException {
this();
setConf(conf);
}
/** Main function called by tool runner.
* It first initializes data by parsing the command line arguments.
* It then calls the loadGenerator
*/
@Override
public int run(String[] args) throws Exception {
int exitCode = parseArgsMR(args);
if (exitCode != 0) {
return exitCode;
}
System.out.println("Running LoadGeneratorMR against fileSystem: " +
FileContext.getFileContext().getDefaultFileSystem().getUri());
return submitAsMapReduce(); // reducer will print the results
}
/**
* Parse the command line arguments and initialize the data.
* Only parse the first arg: -mr <numMapTasks> <mrOutDir> (MUST be first three Args)
* The rest are parsed by the Parent LoadGenerator
**/
private int parseArgsMR(String[] args) throws IOException {
try {
if (args.length >= 3 && args[0].equals("-mr")) {
numMapTasks = Integer.parseInt(args[1]);
mrOutDir = args[2];
if (mrOutDir.startsWith("-")) {
System.err.println("Missing output file parameter, instead got: "
+ mrOutDir);
System.err.println(USAGE);
return -1;
}
} else {
System.err.println(USAGE);
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
String[] strippedArgs = new String[args.length - 3];
for (int i = 0; i < strippedArgs.length; i++) {
strippedArgs[i] = args[i + 3];
}
super.parseArgs(true, strippedArgs); // Parse normal LoadGenerator args
} catch (NumberFormatException e) {
System.err.println("Illegal parameter: " + e.getLocalizedMessage());
System.err.println(USAGE);
return -1;
}
return 0;
}
/** Main program
*
* @param args command line arguments
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new LoadGeneratorMR(), args);
System.exit(res);
}
// The following methods are only used when LoadGenerator is run a MR job
/**
* Based on args we submit the LoadGenerator as MR job.
* Number of MapTasks is numMapTasks
* @return exitCode for job submission
*/
private int submitAsMapReduce() {
System.out.println("Running as a MapReduce job with " +
numMapTasks + " mapTasks; Output to file " + mrOutDir);
Configuration conf = new Configuration(getConf());
// First set all the args of LoadGenerator as Conf vars to pass to MR tasks
conf.set(LG_ROOT , root.toString());
conf.setInt(LG_MAXDELAYBETWEENOPS, maxDelayBetweenOps);
conf.setInt(LG_NUMOFTHREADS, numOfThreads);
conf.set(LG_READPR, readProbs[0]+""); //Pass Double as string
conf.set(LG_WRITEPR, writeProbs[0]+""); //Pass Double as string
conf.setLong(LG_SEED, seed); //No idea what this is
conf.setInt(LG_NUMMAPTASKS, numMapTasks);
if (scriptFile == null && durations[0] <=0) {
System.err.println("When run as a MapReduce job, elapsed Time or ScriptFile must be specified");
System.exit(-1);
}
conf.setLong(LG_ELAPSEDTIME, durations[0]);
conf.setLong(LG_STARTTIME, startTime);
if (scriptFile != null) {
conf.set(LG_SCRIPTFILE , scriptFile);
}
conf.set(LG_FLAGFILE, flagFile.toString());
// Now set the necessary conf variables that apply to run MR itself.
JobConf jobConf = new JobConf(conf, LoadGenerator.class);
jobConf.setJobName("NNLoadGeneratorViaMR");
jobConf.setNumMapTasks(numMapTasks);
jobConf.setNumReduceTasks(1); // 1 reducer to collect the results
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(IntWritable.class);
jobConf.setMapperClass(MapperThatRunsNNLoadGenerator.class);
jobConf.setReducerClass(ReducerThatCollectsLGdata.class);
jobConf.setInputFormat(DummyInputFormat.class);
jobConf.setOutputFormat(TextOutputFormat.class);
// Explicitly set number of max map attempts to 1.
jobConf.setMaxMapAttempts(1);
// Explicitly turn off speculative execution
jobConf.setSpeculativeExecution(false);
// This mapReduce job has no input but has output
FileOutputFormat.setOutputPath(jobConf, new Path(mrOutDir));
try {
JobClient.runJob(jobConf);
} catch (IOException e) {
System.err.println("Failed to run job: " + e.getMessage());
return -1;
}
return 0;
}
// Each split is empty
public static class EmptySplit implements InputSplit {
public void write(DataOutput out) throws IOException {}
public void readFields(DataInput in) throws IOException {}
public long getLength() {return 0L;}
public String[] getLocations() {return new String[0];}
}
// Dummy Input format to send 1 record - number of spits is numMapTasks
public static class DummyInputFormat extends Configured implements
InputFormat<LongWritable, Text> {
public InputSplit[] getSplits(JobConf conf, int numSplits) {
numSplits = conf.getInt("LG.numMapTasks", 1);
InputSplit[] ret = new InputSplit[numSplits];
for (int i = 0; i < numSplits; ++i) {
ret[i] = new EmptySplit();
}
return ret;
}
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit ignored, JobConf conf, Reporter reporter) throws IOException {
return new RecordReader<LongWritable, Text>() {
boolean sentOneRecord = false;
public boolean next(LongWritable key, Text value)
throws IOException {
key.set(1);
value.set("dummy");
if (sentOneRecord == false) { // first call
sentOneRecord = true;
return true;
}
return false; // we have sent one record - we are done
}
public LongWritable createKey() {
return new LongWritable();
}
public Text createValue() {
return new Text();
}
public long getPos() throws IOException {
return 1;
}
public void close() throws IOException {
}
public float getProgress() throws IOException {
return 1;
}
};
}
}
public static class MapperThatRunsNNLoadGenerator extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
private JobConf jobConf;
@Override
public void configure(JobConf job) {
this.jobConf = job;
getArgsFromConfiguration(jobConf);
}
private class ProgressThread extends Thread {
boolean keepGoing; // while this is true, thread runs.
private Reporter reporter;
public ProgressThread(final Reporter r) {
this.reporter = r;
this.keepGoing = true;
}
public void run() {
while (keepGoing) {
if (!ProgressThread.interrupted()) {
try {
sleep(30 * 1000);
} catch (InterruptedException e) {
}
}
reporter.progress();
}
}
}
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
ProgressThread progressThread = new ProgressThread(reporter);
progressThread.start();
try {
new LoadGenerator(jobConf).generateLoadOnNN();
System.out
.println("Finished generating load on NN, sending results to the reducer");
printResults(System.out);
progressThread.keepGoing = false;
progressThread.join();
// Send results to Reducer
output.collect(OPEN_EXECTIME,
new IntWritable((int) executionTime[OPEN]));
output.collect(NUMOPS_OPEN, new IntWritable((int) numOfOps[OPEN]));
output.collect(LIST_EXECTIME,
new IntWritable((int) executionTime[LIST]));
output.collect(NUMOPS_LIST, new IntWritable((int) numOfOps[LIST]));
output.collect(DELETE_EXECTIME, new IntWritable(
(int) executionTime[DELETE]));
output.collect(NUMOPS_DELETE, new IntWritable((int) numOfOps[DELETE]));
output.collect(CREATE_EXECTIME, new IntWritable(
(int) executionTime[CREATE]));
output.collect(NUMOPS_CREATE, new IntWritable((int) numOfOps[CREATE]));
output.collect(WRITE_CLOSE_EXECTIME, new IntWritable(
(int) executionTime[WRITE_CLOSE]));
output.collect(NUMOPS_WRITE_CLOSE, new IntWritable(
(int) numOfOps[WRITE_CLOSE]));
output.collect(TOTALOPS, new IntWritable((int) totalOps));
output.collect(ELAPSED_TIME, new IntWritable((int) totalTime));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void getArgsFromConfiguration(Configuration conf) {
maxDelayBetweenOps = conf.getInt(LG_MAXDELAYBETWEENOPS,
maxDelayBetweenOps);
numOfThreads = conf.getInt(LG_NUMOFTHREADS, numOfThreads);
readProbs[0] = Double.parseDouble(conf.get(LG_READPR, readProbs[0] + ""));
writeProbs[0] = Double.parseDouble(conf.get(LG_WRITEPR, writeProbs[0]
+ ""));
seed = conf.getLong(LG_SEED, seed);
numMapTasks = conf.getInt(LG_NUMMAPTASKS, numMapTasks);
root = new Path(conf.get(LG_ROOT, root.toString()));
durations[0] = conf.getLong(LG_ELAPSEDTIME, 0);
startTime = conf.getLong(LG_STARTTIME, 0);
scriptFile = conf.get(LG_SCRIPTFILE, null);
flagFile = new Path(conf.get(LG_FLAGFILE, FLAGFILE_DEFAULT));
if (durations[0] > 0 && scriptFile != null) {
System.err.println("Cannot specify both ElapsedTime and ScriptFile, exiting");
System.exit(-1);
}
try {
if (scriptFile != null && loadScriptFile(scriptFile, false) < 0) {
System.err.println("Error in scriptFile, exiting");
System.exit(-1);
}
} catch (IOException e) {
System.err.println("Error loading script file " + scriptFile);
e.printStackTrace();
}
if (durations[0] <= 0) {
System.err.println("A duration of zero or less is not allowed when running via MapReduce.");
System.exit(-1);
}
}
}
public static class ReducerThatCollectsLGdata extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
private JobConf jobConf;
@Override
public void configure(JobConf job) {
this.jobConf = job;
}
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
if (key.equals(OPEN_EXECTIME)){
executionTime[OPEN] = sum;
} else if (key.equals(NUMOPS_OPEN)){
numOfOps[OPEN] = sum;
} else if (key.equals(LIST_EXECTIME)){
executionTime[LIST] = sum;
} else if (key.equals(NUMOPS_LIST)){
numOfOps[LIST] = sum;
} else if (key.equals(DELETE_EXECTIME)){
executionTime[DELETE] = sum;
} else if (key.equals(NUMOPS_DELETE)){
numOfOps[DELETE] = sum;
} else if (key.equals(CREATE_EXECTIME)){
executionTime[CREATE] = sum;
} else if (key.equals(NUMOPS_CREATE)){
numOfOps[CREATE] = sum;
} else if (key.equals(WRITE_CLOSE_EXECTIME)){
System.out.println(WRITE_CLOSE_EXECTIME + " = " + sum);
executionTime[WRITE_CLOSE]= sum;
} else if (key.equals(NUMOPS_WRITE_CLOSE)){
numOfOps[WRITE_CLOSE] = sum;
} else if (key.equals(TOTALOPS)){
totalOps = sum;
} else if (key.equals(ELAPSED_TIME)){
totalTime = sum;
}
result.set(sum);
output.collect(key, result);
// System.out.println("Key = " + key + " Sum is =" + sum);
// printResults(System.out);
}
@Override
public void close() throws IOException {
// Output the result to a file Results in the output dir
FileContext fc;
try {
fc = FileContext.getFileContext(jobConf);
} catch (IOException ioe) {
System.err.println("Can not initialize the file system: " +
ioe.getLocalizedMessage());
return;
}
FSDataOutputStream o = fc.create(FileOutputFormat.getTaskOutputPath(jobConf, "Results"),
EnumSet.of(CreateFlag.CREATE));
PrintStream out = new PrintStream(o);
printResults(out);
out.close();
o.close();
}
}
}

View File

@ -41,6 +41,10 @@ import org.apache.hadoop.fs.DFSCIOTest;
import org.apache.hadoop.fs.DistributedFSCheck; import org.apache.hadoop.fs.DistributedFSCheck;
import org.apache.hadoop.io.FileBench; import org.apache.hadoop.io.FileBench;
import org.apache.hadoop.fs.JHLogAnalyzer; import org.apache.hadoop.fs.JHLogAnalyzer;
import org.apache.hadoop.fs.loadGenerator.DataGenerator;
import org.apache.hadoop.fs.loadGenerator.LoadGenerator;
import org.apache.hadoop.fs.loadGenerator.LoadGeneratorMR;
import org.apache.hadoop.fs.loadGenerator.StructureGenerator;
import org.apache.hadoop.fs.slive.SliveTest; import org.apache.hadoop.fs.slive.SliveTest;
/** /**
@ -107,6 +111,14 @@ public class MapredTestDriver {
"Single process HDFS and MR cluster."); "Single process HDFS and MR cluster.");
pgd.addClass("largesorter", LargeSorter.class, pgd.addClass("largesorter", LargeSorter.class,
"Large-Sort tester"); "Large-Sort tester");
pgd.addClass("NNloadGenerator", LoadGenerator.class,
"Generate load on Namenode using NN loadgenerator run WITHOUT MR");
pgd.addClass("NNloadGeneratorMR", LoadGeneratorMR.class,
"Generate load on Namenode using NN loadgenerator run as MR job");
pgd.addClass("NNstructureGenerator", StructureGenerator.class,
"Generate the structure to be used by NNdataGenerator");
pgd.addClass("NNdataGenerator", DataGenerator.class,
"Generate the data to be used by NNloadGenerator");
} catch(Throwable e) { } catch(Throwable e) {
e.printStackTrace(); e.printStackTrace();
} }