HADOOP-9992. Modify the NN loadGenerator to optionally run as a MapReduce job. Contributed by Akshay Radia
This commit is contained in:
parent
8c3e888dcd
commit
09d31bc630
|
@ -469,6 +469,9 @@ Release 2.7.0 - UNRELEASED
|
|||
HADOOP-11464. Reinstate support for launching Hadoop processes on Windows
|
||||
using Cygwin. (cnauroth)
|
||||
|
||||
HADOOP-9992. Modify the NN loadGenerator to optionally run as a MapReduce job
|
||||
(Akshay Radia via brandonli)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-11323. WritableComparator#compare keeps reference to byte array.
|
||||
|
|
|
@ -19,10 +19,12 @@
|
|||
package org.apache.hadoop.fs.loadGenerator;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.PrintStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -36,10 +38,11 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Options.CreateOpts;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
|
@ -48,8 +51,11 @@ import org.apache.hadoop.util.ToolRunner;
|
|||
import com.google.common.base.Preconditions;
|
||||
|
||||
/** The load generator is a tool for testing NameNode behavior under
|
||||
* different client loads.
|
||||
* It allows the user to generate different mixes of read, write,
|
||||
* different client loads. Note there is a subclass of this clas that lets
|
||||
* you run a the load generator as a MapReduce job (see LoadGeneratorMR in the
|
||||
* MapReduce project.
|
||||
*
|
||||
* The loadGenerator allows the user to generate different mixes of read, write,
|
||||
* and list requests by specifying the probabilities of read and
|
||||
* write. The user controls the intensity of the load by
|
||||
* adjusting parameters for the number of worker threads and the delay
|
||||
|
@ -59,14 +65,23 @@ import com.google.common.base.Preconditions;
|
|||
* execution time of each kind of operations and the NameNode
|
||||
* throughput.
|
||||
*
|
||||
* The program can run in one of two forms. As a regular single process command
|
||||
* that runs multiple threads to generate load on the NN or as a Map Reduce
|
||||
* program that runs multiple (multi-threaded) map tasks that generate load
|
||||
* on the NN; the results summary is generated by a single reduce task.
|
||||
*
|
||||
*
|
||||
* The user may either specify constant duration, read and write
|
||||
* probabilities via the command line, or may specify a text file
|
||||
* that acts as a script of which read and write probabilities to
|
||||
* use for specified durations.
|
||||
* use for specified durations. If no duration is specified the program
|
||||
* runs till killed (duration required if run as MapReduce).
|
||||
*
|
||||
* The script takes the form of lines of duration in seconds, read
|
||||
* probability and write probability, each separated by white space.
|
||||
* Blank lines and lines starting with # (comments) are ignored.
|
||||
* Blank lines and lines starting with # (comments) are ignored. If load
|
||||
* generator is run as a MapReduce program then the script file needs to be
|
||||
* accessible on the the Map task as a HDFS file.
|
||||
*
|
||||
* After command line argument parsing and data initialization,
|
||||
* the load generator spawns the number of worker threads
|
||||
|
@ -116,22 +131,28 @@ import com.google.common.base.Preconditions;
|
|||
public class LoadGenerator extends Configured implements Tool {
|
||||
public static final Log LOG = LogFactory.getLog(LoadGenerator.class);
|
||||
|
||||
private volatile boolean shouldRun = true;
|
||||
private Path root = DataGenerator.DEFAULT_ROOT;
|
||||
private FileContext fc;
|
||||
private int maxDelayBetweenOps = 0;
|
||||
private int numOfThreads = 200;
|
||||
private long [] durations = {0};
|
||||
private double [] readProbs = {0.3333};
|
||||
private double [] writeProbs = {0.3333};
|
||||
private volatile int currentIndex = 0;
|
||||
long totalTime = 0;
|
||||
private long startTime = Time.now()+10000;
|
||||
private volatile static boolean shouldRun = true;
|
||||
protected static Path root = DataGenerator.DEFAULT_ROOT;
|
||||
private static FileContext fc;
|
||||
protected static int maxDelayBetweenOps = 0;
|
||||
protected static int numOfThreads = 200;
|
||||
protected static long [] durations = {0};
|
||||
protected static double [] readProbs = {0.3333};
|
||||
protected static double [] writeProbs = {0.3333};
|
||||
private static volatile int currentIndex = 0;
|
||||
protected static long totalTime = 0;
|
||||
protected static long startTime = Time.now()+10000;
|
||||
final static private int BLOCK_SIZE = 10;
|
||||
private ArrayList<String> files = new ArrayList<String>(); // a table of file names
|
||||
private ArrayList<String> dirs = new ArrayList<String>(); // a table of directory names
|
||||
private Random r = null;
|
||||
final private static String USAGE = "java LoadGenerator\n" +
|
||||
private static ArrayList<String> files = new ArrayList<String>(); // a table of file names
|
||||
private static ArrayList<String> dirs = new ArrayList<String>(); // a table of directory names
|
||||
protected static Random r = null;
|
||||
protected static long seed = 0;
|
||||
protected static String scriptFile = null;
|
||||
protected static final String FLAGFILE_DEFAULT = "/tmp/flagFile";
|
||||
protected static Path flagFile = new Path(FLAGFILE_DEFAULT);
|
||||
protected String hostname;
|
||||
final private static String USAGE_CMD = "java LoadGenerator\n";
|
||||
final protected static String USAGE_ARGS =
|
||||
"-readProbability <read probability>\n" +
|
||||
"-writeProbability <write probability>\n" +
|
||||
"-root <root>\n" +
|
||||
|
@ -139,8 +160,14 @@ public class LoadGenerator extends Configured implements Tool {
|
|||
"-numOfThreads <numOfThreads>\n" +
|
||||
"-elapsedTime <elapsedTimeInSecs>\n" +
|
||||
"-startTime <startTimeInMillis>\n" +
|
||||
"-scriptFile <filename>";
|
||||
final private String hostname;
|
||||
"-scriptFile <filename>\n" +
|
||||
"-flagFile <filename>";
|
||||
final private static String USAGE = USAGE_CMD + USAGE_ARGS;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
private final byte[] WRITE_CONTENTS = new byte[4096];
|
||||
|
||||
private static final int ERR_TEST_FAILED = 2;
|
||||
|
@ -152,14 +179,20 @@ public class LoadGenerator extends Configured implements Tool {
|
|||
Arrays.fill(WRITE_CONTENTS, (byte) 'a');
|
||||
}
|
||||
|
||||
private final static int OPEN = 0;
|
||||
private final static int LIST = 1;
|
||||
private final static int CREATE = 2;
|
||||
private final static int WRITE_CLOSE = 3;
|
||||
private final static int DELETE = 4;
|
||||
private final static int TOTAL_OP_TYPES =5;
|
||||
private long [] executionTime = new long[TOTAL_OP_TYPES];
|
||||
private long [] totalNumOfOps = new long[TOTAL_OP_TYPES];
|
||||
public LoadGenerator(Configuration conf) throws IOException, UnknownHostException {
|
||||
this();
|
||||
setConf(conf);
|
||||
}
|
||||
|
||||
protected final static int OPEN = 0;
|
||||
protected final static int LIST = 1;
|
||||
protected final static int CREATE = 2;
|
||||
protected final static int WRITE_CLOSE = 3;
|
||||
protected final static int DELETE = 4;
|
||||
protected final static int TOTAL_OP_TYPES =5;
|
||||
protected static long [] executionTime = new long[TOTAL_OP_TYPES];
|
||||
protected static long [] numOfOps = new long[TOTAL_OP_TYPES];
|
||||
protected static long totalOps = 0; // across all of types
|
||||
|
||||
/** A thread sends a stream of requests to the NameNode.
|
||||
* At each iteration, it first decides if it is going to read a file,
|
||||
|
@ -192,7 +225,7 @@ public class LoadGenerator extends Configured implements Tool {
|
|||
this.id = id;
|
||||
}
|
||||
|
||||
/** Main loop
|
||||
/** Main loop for each thread
|
||||
* Each iteration decides what's the next operation and then pauses.
|
||||
*/
|
||||
@Override
|
||||
|
@ -295,7 +328,7 @@ public class LoadGenerator extends Configured implements Tool {
|
|||
CreateOpts.createParent(), CreateOpts.bufferSize(4096),
|
||||
CreateOpts.repFac((short) 3));
|
||||
executionTime[CREATE] += (Time.now() - startTime);
|
||||
totalNumOfOps[CREATE]++;
|
||||
numOfOps[CREATE]++;
|
||||
|
||||
long i = fileSize;
|
||||
while (i > 0) {
|
||||
|
@ -306,28 +339,67 @@ public class LoadGenerator extends Configured implements Tool {
|
|||
|
||||
startTime = Time.now();
|
||||
executionTime[WRITE_CLOSE] += (Time.now() - startTime);
|
||||
totalNumOfOps[WRITE_CLOSE]++;
|
||||
numOfOps[WRITE_CLOSE]++;
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Main function:
|
||||
/** Main function called by tool runner.
|
||||
* It first initializes data by parsing the command line arguments.
|
||||
* It then starts the number of DFSClient threads as specified by
|
||||
* the user.
|
||||
* It stops all the threads when the specified elapsed time is passed.
|
||||
* Before exiting, it prints the average execution for
|
||||
* each operation and operation throughput.
|
||||
* It then calls the loadGenerator
|
||||
*/
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
int exitCode = init(args);
|
||||
int exitCode = parseArgs(false, args);
|
||||
if (exitCode != 0) {
|
||||
return exitCode;
|
||||
}
|
||||
System.out.println("Running LoadGenerator against fileSystem: " +
|
||||
FileContext.getFileContext().getDefaultFileSystem().getUri());
|
||||
exitCode = generateLoadOnNN();
|
||||
printResults(System.out);
|
||||
return exitCode;
|
||||
}
|
||||
|
||||
boolean stopFileCreated() {
|
||||
try {
|
||||
fc.getFileStatus(flagFile);
|
||||
} catch (FileNotFoundException e) {
|
||||
return false;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Got error when checking if file exists:" + flagFile, e);
|
||||
}
|
||||
LOG.info("Flag file was created. Stopping the test.");
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the main function - run threads to generate load on NN
|
||||
* It starts the number of DFSClient threads as specified by
|
||||
* the user.
|
||||
* It stops all the threads when the specified elapsed time is passed.
|
||||
*/
|
||||
protected int generateLoadOnNN() throws InterruptedException {
|
||||
int hostHashCode = hostname.hashCode();
|
||||
if (seed == 0) {
|
||||
r = new Random(System.currentTimeMillis()+hostHashCode);
|
||||
} else {
|
||||
r = new Random(seed+hostHashCode);
|
||||
}
|
||||
try {
|
||||
fc = FileContext.getFileContext(getConf());
|
||||
} catch (IOException ioe) {
|
||||
System.err.println("Can not initialize the file system: " +
|
||||
ioe.getLocalizedMessage());
|
||||
return -1;
|
||||
}
|
||||
|
||||
int status = initFileDirTables();
|
||||
if (status != 0) {
|
||||
return status;
|
||||
}
|
||||
barrier();
|
||||
|
||||
DFSClientThread[] threads = new DFSClientThread[numOfThreads];
|
||||
|
@ -337,15 +409,25 @@ public class LoadGenerator extends Configured implements Tool {
|
|||
}
|
||||
|
||||
if (durations[0] > 0) {
|
||||
while(shouldRun) {
|
||||
if (durations.length == 1) {// There is a fixed run time
|
||||
while (shouldRun) {
|
||||
Thread.sleep(2000);
|
||||
totalTime += 2;
|
||||
if (totalTime >= durations[0] || stopFileCreated()) {
|
||||
shouldRun = false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// script run
|
||||
|
||||
while (shouldRun) {
|
||||
Thread.sleep(durations[currentIndex] * 1000);
|
||||
totalTime += durations[currentIndex];
|
||||
|
||||
// Are we on the final line of the script?
|
||||
if( (currentIndex + 1) == durations.length) {
|
||||
if ((currentIndex + 1) == durations.length || stopFileCreated()) {
|
||||
shouldRun = false;
|
||||
} else {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Moving to index " + currentIndex + ": r = "
|
||||
+ readProbs[currentIndex] + ", w = " + writeProbs
|
||||
+ " for duration " + durations[currentIndex]);
|
||||
|
@ -354,6 +436,7 @@ public class LoadGenerator extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Done with testing. Waiting for threads to finish.");
|
||||
|
@ -364,64 +447,61 @@ public class LoadGenerator extends Configured implements Tool {
|
|||
thread.join();
|
||||
for (int i=0; i<TOTAL_OP_TYPES; i++) {
|
||||
executionTime[i] += thread.executionTime[i];
|
||||
totalNumOfOps[i] += thread.totalNumOfOps[i];
|
||||
numOfOps[i] += thread.totalNumOfOps[i];
|
||||
}
|
||||
failed = failed || thread.failed;
|
||||
}
|
||||
|
||||
int exitCode = 0;
|
||||
if (failed) {
|
||||
exitCode = -ERR_TEST_FAILED;
|
||||
}
|
||||
|
||||
long totalOps = 0;
|
||||
totalOps = 0;
|
||||
for (int i=0; i<TOTAL_OP_TYPES; i++) {
|
||||
totalOps += totalNumOfOps[i];
|
||||
totalOps += numOfOps[i];
|
||||
}
|
||||
|
||||
if (totalNumOfOps[OPEN] != 0) {
|
||||
System.out.println("Average open execution time: " +
|
||||
(double)executionTime[OPEN]/totalNumOfOps[OPEN] + "ms");
|
||||
}
|
||||
if (totalNumOfOps[LIST] != 0) {
|
||||
System.out.println("Average list execution time: " +
|
||||
(double)executionTime[LIST]/totalNumOfOps[LIST] + "ms");
|
||||
}
|
||||
if (totalNumOfOps[DELETE] != 0) {
|
||||
System.out.println("Average deletion execution time: " +
|
||||
(double)executionTime[DELETE]/totalNumOfOps[DELETE] + "ms");
|
||||
System.out.println("Average create execution time: " +
|
||||
(double)executionTime[CREATE]/totalNumOfOps[CREATE] + "ms");
|
||||
System.out.println("Average write_close execution time: " +
|
||||
(double)executionTime[WRITE_CLOSE]/totalNumOfOps[WRITE_CLOSE] + "ms");
|
||||
}
|
||||
if (durations[0] != 0) {
|
||||
System.out.println("Average operations per second: " +
|
||||
(double)totalOps/totalTime +"ops/s");
|
||||
}
|
||||
System.out.println();
|
||||
return exitCode;
|
||||
}
|
||||
|
||||
/** Parse the command line arguments and initialize the data */
|
||||
private int init(String[] args) throws IOException {
|
||||
try {
|
||||
fc = FileContext.getFileContext(getConf());
|
||||
} catch (IOException ioe) {
|
||||
System.err.println("Can not initialize the file system: " +
|
||||
ioe.getLocalizedMessage());
|
||||
return -1;
|
||||
protected static void printResults(PrintStream out) throws UnsupportedFileSystemException {
|
||||
out.println("Result of running LoadGenerator against fileSystem: " +
|
||||
FileContext.getFileContext().getDefaultFileSystem().getUri());
|
||||
if (numOfOps[OPEN] != 0) {
|
||||
out.println("Average open execution time: " +
|
||||
(double)executionTime[OPEN]/numOfOps[OPEN] + "ms");
|
||||
}
|
||||
if (numOfOps[LIST] != 0) {
|
||||
out.println("Average list execution time: " +
|
||||
(double)executionTime[LIST]/numOfOps[LIST] + "ms");
|
||||
}
|
||||
if (numOfOps[DELETE] != 0) {
|
||||
out.println("Average deletion execution time: " +
|
||||
(double)executionTime[DELETE]/numOfOps[DELETE] + "ms");
|
||||
out.println("Average create execution time: " +
|
||||
(double)executionTime[CREATE]/numOfOps[CREATE] + "ms");
|
||||
out.println("Average write_close execution time: " +
|
||||
(double)executionTime[WRITE_CLOSE]/numOfOps[WRITE_CLOSE] + "ms");
|
||||
}
|
||||
if (totalTime != 0) {
|
||||
out.println("Average operations per second: " +
|
||||
(double)totalOps/totalTime +"ops/s");
|
||||
}
|
||||
out.println();
|
||||
}
|
||||
int hostHashCode = hostname.hashCode();
|
||||
boolean scriptSpecified = false;
|
||||
|
||||
|
||||
/** Parse the command line arguments and initialize the data */
|
||||
protected int parseArgs(boolean runAsMapReduce, String[] args) throws IOException {
|
||||
try {
|
||||
for (int i = 0; i < args.length; i++) { // parse command line
|
||||
if (args[i].equals("-scriptFile")) {
|
||||
if(loadScriptFile(args[++i]) == -1)
|
||||
scriptFile = args[++i];
|
||||
if (durations[0] > 0) {
|
||||
System.err.println("Can't specify elapsedTime and use script.");
|
||||
return -1;
|
||||
scriptSpecified = true;
|
||||
}
|
||||
} else if (args[i].equals("-readProbability")) {
|
||||
if(scriptSpecified) {
|
||||
if (scriptFile != null) {
|
||||
System.err.println("Can't specify probabilities and use script.");
|
||||
return -1;
|
||||
}
|
||||
|
@ -432,7 +512,7 @@ public class LoadGenerator extends Configured implements Tool {
|
|||
return -1;
|
||||
}
|
||||
} else if (args[i].equals("-writeProbability")) {
|
||||
if(scriptSpecified) {
|
||||
if (scriptFile != null) {
|
||||
System.err.println("Can't specify probabilities and use script.");
|
||||
return -1;
|
||||
}
|
||||
|
@ -456,14 +536,18 @@ public class LoadGenerator extends Configured implements Tool {
|
|||
} else if (args[i].equals("-startTime")) {
|
||||
startTime = Long.parseLong(args[++i]);
|
||||
} else if (args[i].equals("-elapsedTime")) {
|
||||
if(scriptSpecified) {
|
||||
if (scriptFile != null) {
|
||||
System.err.println("Can't specify elapsedTime and use script.");
|
||||
return -1;
|
||||
}
|
||||
durations[0] = Long.parseLong(args[++i]);
|
||||
} else if (args[i].equals("-seed")) {
|
||||
r = new Random(Long.parseLong(args[++i])+hostHashCode);
|
||||
} else {
|
||||
seed = Long.parseLong(args[++i]);
|
||||
r = new Random(seed);
|
||||
} else if (args[i].equals("-flagFile")) {
|
||||
LOG.info("got flagFile:" + flagFile);
|
||||
flagFile = new Path(args[++i]);
|
||||
}else {
|
||||
System.err.println(USAGE);
|
||||
ToolRunner.printGenericCommandUsage(System.err);
|
||||
return -1;
|
||||
|
@ -475,6 +559,12 @@ public class LoadGenerator extends Configured implements Tool {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// Load Script File if not MR; for MR scriptFile is loaded by Mapper
|
||||
if (!runAsMapReduce && scriptFile != null) {
|
||||
if(loadScriptFile(scriptFile, true) == -1)
|
||||
return -1;
|
||||
}
|
||||
|
||||
for(int i = 0; i < readProbs.length; i++) {
|
||||
if (readProbs[i] + writeProbs[i] <0 || readProbs[i]+ writeProbs[i] > 1) {
|
||||
System.err.println(
|
||||
|
@ -483,12 +573,7 @@ public class LoadGenerator extends Configured implements Tool {
|
|||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (r==null) {
|
||||
r = new Random(Time.now()+hostHashCode);
|
||||
}
|
||||
|
||||
return initFileDirTables();
|
||||
return 0;
|
||||
}
|
||||
|
||||
private static void parseScriptLine(String line, ArrayList<Long> duration,
|
||||
|
@ -527,9 +612,25 @@ public class LoadGenerator extends Configured implements Tool {
|
|||
* @return 0 if successful, -1 if not
|
||||
* @throws IOException if errors with file IO
|
||||
*/
|
||||
private int loadScriptFile(String filename) throws IOException {
|
||||
FileReader fr = new FileReader(new File(filename));
|
||||
BufferedReader br = new BufferedReader(fr);
|
||||
protected static int loadScriptFile(String filename, boolean readLocally) throws IOException {
|
||||
|
||||
FileContext fc;
|
||||
if (readLocally) { // read locally - program is run without MR
|
||||
fc = FileContext.getLocalFSFileContext();
|
||||
} else {
|
||||
fc = FileContext.getFileContext(); // use default file system
|
||||
}
|
||||
DataInputStream in = null;
|
||||
try {
|
||||
in = fc.open(new Path(filename));
|
||||
} catch (IOException e) {
|
||||
System.err.println("Unable to open scriptFile: " + filename);
|
||||
|
||||
System.exit(-1);
|
||||
}
|
||||
InputStreamReader inr = new InputStreamReader(in);
|
||||
|
||||
BufferedReader br = new BufferedReader(inr);
|
||||
ArrayList<Long> duration = new ArrayList<Long>();
|
||||
ArrayList<Double> readProb = new ArrayList<Double>();
|
||||
ArrayList<Double> writeProb = new ArrayList<Double>();
|
||||
|
@ -619,7 +720,7 @@ public class LoadGenerator extends Configured implements Tool {
|
|||
* This allows multiple instances of this program, running on clock
|
||||
* synchronized nodes, to start at roughly the same time.
|
||||
*/
|
||||
private void barrier() {
|
||||
private static void barrier() {
|
||||
long sleepTime;
|
||||
while ((sleepTime = startTime - Time.now()) > 0) {
|
||||
try {
|
||||
|
@ -635,9 +736,7 @@ public class LoadGenerator extends Configured implements Tool {
|
|||
* @throws Exception
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
int res = ToolRunner.run(new Configuration(),
|
||||
new LoadGenerator(), args);
|
||||
int res = ToolRunner.run(new Configuration(), new LoadGenerator(), args);
|
||||
System.exit(res);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,483 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.loadGenerator;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.FileOutputFormat;
|
||||
import org.apache.hadoop.mapred.InputFormat;
|
||||
import org.apache.hadoop.mapred.InputSplit;
|
||||
import org.apache.hadoop.mapred.JobClient;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.MapReduceBase;
|
||||
import org.apache.hadoop.mapred.Mapper;
|
||||
import org.apache.hadoop.mapred.OutputCollector;
|
||||
import org.apache.hadoop.mapred.RecordReader;
|
||||
import org.apache.hadoop.mapred.Reducer;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.hadoop.mapred.TextOutputFormat;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
/** The load generator is a tool for testing NameNode behavior under
|
||||
* different client loads.
|
||||
* The main code is in HadoopCommon, @LoadGenerator. This class, LoadGeneratorMR
|
||||
* lets you run that LoadGenerator as a MapReduce job.
|
||||
*
|
||||
* The synopsis of the command is
|
||||
* java LoadGeneratorMR
|
||||
* -mr <numMapJobs> <outputDir> : results in outputDir/Results
|
||||
* the rest of the args are the same as the original LoadGenerator.
|
||||
*
|
||||
*/
|
||||
public class LoadGeneratorMR extends LoadGenerator {
|
||||
public static final Log LOG = LogFactory.getLog(LoadGenerator.class);
|
||||
private static int numMapTasks = 1;
|
||||
private String mrOutDir;
|
||||
|
||||
final private static String USAGE_CMD = "java LoadGeneratorMR\n";
|
||||
final private static String USAGE = USAGE_CMD
|
||||
+ "-mr <numMapJobs> <outputDir> [MUST be first 3 args] \n" + USAGE_ARGS ;
|
||||
|
||||
// Constant "keys" used to communicate between map and reduce
|
||||
final private static Text OPEN_EXECTIME = new Text("OpenExecutionTime");
|
||||
final private static Text NUMOPS_OPEN = new Text("NumOpsOpen");
|
||||
final private static Text LIST_EXECTIME = new Text("ListExecutionTime");
|
||||
final private static Text NUMOPS_LIST = new Text("NumOpsList");
|
||||
final private static Text DELETE_EXECTIME = new Text("DeletionExecutionTime");
|
||||
final private static Text NUMOPS_DELETE = new Text("NumOpsDelete");
|
||||
final private static Text CREATE_EXECTIME = new Text("CreateExecutionTime");
|
||||
final private static Text NUMOPS_CREATE = new Text("NumOpsCreate");
|
||||
final private static Text WRITE_CLOSE_EXECTIME = new Text("WriteCloseExecutionTime");
|
||||
final private static Text NUMOPS_WRITE_CLOSE = new Text("NumOpsWriteClose");
|
||||
final private static Text ELAPSED_TIME = new Text("ElapsedTime");
|
||||
final private static Text TOTALOPS = new Text("TotalOps");
|
||||
|
||||
// Config keys to pass args from Main to the Job
|
||||
final private static String LG_ROOT = "LG.root";
|
||||
final private static String LG_SCRIPTFILE = "LG.scriptFile";
|
||||
final private static String LG_MAXDELAYBETWEENOPS = "LG.maxDelayBetweenOps";
|
||||
final private static String LG_NUMOFTHREADS = "LG.numOfThreads";
|
||||
final private static String LG_READPR = "LG.readPr";
|
||||
final private static String LG_WRITEPR = "LG.writePr";
|
||||
final private static String LG_SEED = "LG.r";
|
||||
final private static String LG_NUMMAPTASKS = "LG.numMapTasks";
|
||||
final private static String LG_ELAPSEDTIME = "LG.elapsedTime";
|
||||
final private static String LG_STARTTIME = "LG.startTime";
|
||||
final private static String LG_FLAGFILE = "LG.flagFile";
|
||||
|
||||
|
||||
/** Constructor */
|
||||
public LoadGeneratorMR() throws IOException, UnknownHostException {
|
||||
super();
|
||||
}
|
||||
|
||||
public LoadGeneratorMR(Configuration conf) throws IOException, UnknownHostException {
|
||||
this();
|
||||
setConf(conf);
|
||||
}
|
||||
|
||||
/** Main function called by tool runner.
|
||||
* It first initializes data by parsing the command line arguments.
|
||||
* It then calls the loadGenerator
|
||||
*/
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
int exitCode = parseArgsMR(args);
|
||||
if (exitCode != 0) {
|
||||
return exitCode;
|
||||
}
|
||||
System.out.println("Running LoadGeneratorMR against fileSystem: " +
|
||||
FileContext.getFileContext().getDefaultFileSystem().getUri());
|
||||
|
||||
return submitAsMapReduce(); // reducer will print the results
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Parse the command line arguments and initialize the data.
|
||||
* Only parse the first arg: -mr <numMapTasks> <mrOutDir> (MUST be first three Args)
|
||||
* The rest are parsed by the Parent LoadGenerator
|
||||
**/
|
||||
|
||||
private int parseArgsMR(String[] args) throws IOException {
|
||||
try {
|
||||
if (args.length >= 3 && args[0].equals("-mr")) {
|
||||
numMapTasks = Integer.parseInt(args[1]);
|
||||
mrOutDir = args[2];
|
||||
if (mrOutDir.startsWith("-")) {
|
||||
System.err.println("Missing output file parameter, instead got: "
|
||||
+ mrOutDir);
|
||||
System.err.println(USAGE);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
System.err.println(USAGE);
|
||||
ToolRunner.printGenericCommandUsage(System.err);
|
||||
return -1;
|
||||
}
|
||||
String[] strippedArgs = new String[args.length - 3];
|
||||
for (int i = 0; i < strippedArgs.length; i++) {
|
||||
strippedArgs[i] = args[i + 3];
|
||||
}
|
||||
super.parseArgs(true, strippedArgs); // Parse normal LoadGenerator args
|
||||
} catch (NumberFormatException e) {
|
||||
System.err.println("Illegal parameter: " + e.getLocalizedMessage());
|
||||
System.err.println(USAGE);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/** Main program
|
||||
*
|
||||
* @param args command line arguments
|
||||
* @throws Exception
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
int res = ToolRunner.run(new Configuration(), new LoadGeneratorMR(), args);
|
||||
System.exit(res);
|
||||
}
|
||||
|
||||
|
||||
// The following methods are only used when LoadGenerator is run a MR job
|
||||
/**
|
||||
* Based on args we submit the LoadGenerator as MR job.
|
||||
* Number of MapTasks is numMapTasks
|
||||
* @return exitCode for job submission
|
||||
*/
|
||||
private int submitAsMapReduce() {
|
||||
|
||||
System.out.println("Running as a MapReduce job with " +
|
||||
numMapTasks + " mapTasks; Output to file " + mrOutDir);
|
||||
|
||||
|
||||
Configuration conf = new Configuration(getConf());
|
||||
|
||||
// First set all the args of LoadGenerator as Conf vars to pass to MR tasks
|
||||
|
||||
conf.set(LG_ROOT , root.toString());
|
||||
conf.setInt(LG_MAXDELAYBETWEENOPS, maxDelayBetweenOps);
|
||||
conf.setInt(LG_NUMOFTHREADS, numOfThreads);
|
||||
conf.set(LG_READPR, readProbs[0]+""); //Pass Double as string
|
||||
conf.set(LG_WRITEPR, writeProbs[0]+""); //Pass Double as string
|
||||
conf.setLong(LG_SEED, seed); //No idea what this is
|
||||
conf.setInt(LG_NUMMAPTASKS, numMapTasks);
|
||||
if (scriptFile == null && durations[0] <=0) {
|
||||
System.err.println("When run as a MapReduce job, elapsed Time or ScriptFile must be specified");
|
||||
System.exit(-1);
|
||||
}
|
||||
conf.setLong(LG_ELAPSEDTIME, durations[0]);
|
||||
conf.setLong(LG_STARTTIME, startTime);
|
||||
if (scriptFile != null) {
|
||||
conf.set(LG_SCRIPTFILE , scriptFile);
|
||||
}
|
||||
conf.set(LG_FLAGFILE, flagFile.toString());
|
||||
|
||||
// Now set the necessary conf variables that apply to run MR itself.
|
||||
JobConf jobConf = new JobConf(conf, LoadGenerator.class);
|
||||
jobConf.setJobName("NNLoadGeneratorViaMR");
|
||||
jobConf.setNumMapTasks(numMapTasks);
|
||||
jobConf.setNumReduceTasks(1); // 1 reducer to collect the results
|
||||
|
||||
jobConf.setOutputKeyClass(Text.class);
|
||||
jobConf.setOutputValueClass(IntWritable.class);
|
||||
|
||||
jobConf.setMapperClass(MapperThatRunsNNLoadGenerator.class);
|
||||
jobConf.setReducerClass(ReducerThatCollectsLGdata.class);
|
||||
|
||||
jobConf.setInputFormat(DummyInputFormat.class);
|
||||
jobConf.setOutputFormat(TextOutputFormat.class);
|
||||
|
||||
// Explicitly set number of max map attempts to 1.
|
||||
jobConf.setMaxMapAttempts(1);
|
||||
// Explicitly turn off speculative execution
|
||||
jobConf.setSpeculativeExecution(false);
|
||||
|
||||
// This mapReduce job has no input but has output
|
||||
FileOutputFormat.setOutputPath(jobConf, new Path(mrOutDir));
|
||||
|
||||
try {
|
||||
JobClient.runJob(jobConf);
|
||||
} catch (IOException e) {
|
||||
System.err.println("Failed to run job: " + e.getMessage());
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
|
||||
}
|
||||
|
||||
|
||||
// Each split is empty
|
||||
public static class EmptySplit implements InputSplit {
|
||||
public void write(DataOutput out) throws IOException {}
|
||||
public void readFields(DataInput in) throws IOException {}
|
||||
public long getLength() {return 0L;}
|
||||
public String[] getLocations() {return new String[0];}
|
||||
}
|
||||
|
||||
// Dummy Input format to send 1 record - number of spits is numMapTasks
|
||||
public static class DummyInputFormat extends Configured implements
|
||||
InputFormat<LongWritable, Text> {
|
||||
|
||||
public InputSplit[] getSplits(JobConf conf, int numSplits) {
|
||||
numSplits = conf.getInt("LG.numMapTasks", 1);
|
||||
InputSplit[] ret = new InputSplit[numSplits];
|
||||
for (int i = 0; i < numSplits; ++i) {
|
||||
ret[i] = new EmptySplit();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
public RecordReader<LongWritable, Text> getRecordReader(
|
||||
InputSplit ignored, JobConf conf, Reporter reporter) throws IOException {
|
||||
|
||||
return new RecordReader<LongWritable, Text>() {
|
||||
|
||||
boolean sentOneRecord = false;
|
||||
|
||||
public boolean next(LongWritable key, Text value)
|
||||
throws IOException {
|
||||
key.set(1);
|
||||
value.set("dummy");
|
||||
if (sentOneRecord == false) { // first call
|
||||
sentOneRecord = true;
|
||||
return true;
|
||||
}
|
||||
return false; // we have sent one record - we are done
|
||||
}
|
||||
|
||||
public LongWritable createKey() {
|
||||
return new LongWritable();
|
||||
}
|
||||
public Text createValue() {
|
||||
return new Text();
|
||||
}
|
||||
public long getPos() throws IOException {
|
||||
return 1;
|
||||
}
|
||||
public void close() throws IOException {
|
||||
}
|
||||
public float getProgress() throws IOException {
|
||||
return 1;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public static class MapperThatRunsNNLoadGenerator extends MapReduceBase
|
||||
implements Mapper<LongWritable, Text, Text, IntWritable> {
|
||||
|
||||
private JobConf jobConf;
|
||||
|
||||
@Override
|
||||
public void configure(JobConf job) {
|
||||
this.jobConf = job;
|
||||
getArgsFromConfiguration(jobConf);
|
||||
}
|
||||
|
||||
private class ProgressThread extends Thread {
|
||||
|
||||
boolean keepGoing; // while this is true, thread runs.
|
||||
private Reporter reporter;
|
||||
|
||||
public ProgressThread(final Reporter r) {
|
||||
this.reporter = r;
|
||||
this.keepGoing = true;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
while (keepGoing) {
|
||||
if (!ProgressThread.interrupted()) {
|
||||
try {
|
||||
sleep(30 * 1000);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
reporter.progress();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void map(LongWritable key, Text value,
|
||||
OutputCollector<Text, IntWritable> output, Reporter reporter)
|
||||
throws IOException {
|
||||
ProgressThread progressThread = new ProgressThread(reporter);
|
||||
progressThread.start();
|
||||
try {
|
||||
new LoadGenerator(jobConf).generateLoadOnNN();
|
||||
System.out
|
||||
.println("Finished generating load on NN, sending results to the reducer");
|
||||
printResults(System.out);
|
||||
progressThread.keepGoing = false;
|
||||
progressThread.join();
|
||||
|
||||
// Send results to Reducer
|
||||
output.collect(OPEN_EXECTIME,
|
||||
new IntWritable((int) executionTime[OPEN]));
|
||||
output.collect(NUMOPS_OPEN, new IntWritable((int) numOfOps[OPEN]));
|
||||
|
||||
output.collect(LIST_EXECTIME,
|
||||
new IntWritable((int) executionTime[LIST]));
|
||||
output.collect(NUMOPS_LIST, new IntWritable((int) numOfOps[LIST]));
|
||||
|
||||
output.collect(DELETE_EXECTIME, new IntWritable(
|
||||
(int) executionTime[DELETE]));
|
||||
output.collect(NUMOPS_DELETE, new IntWritable((int) numOfOps[DELETE]));
|
||||
|
||||
output.collect(CREATE_EXECTIME, new IntWritable(
|
||||
(int) executionTime[CREATE]));
|
||||
output.collect(NUMOPS_CREATE, new IntWritable((int) numOfOps[CREATE]));
|
||||
|
||||
output.collect(WRITE_CLOSE_EXECTIME, new IntWritable(
|
||||
(int) executionTime[WRITE_CLOSE]));
|
||||
output.collect(NUMOPS_WRITE_CLOSE, new IntWritable(
|
||||
(int) numOfOps[WRITE_CLOSE]));
|
||||
|
||||
output.collect(TOTALOPS, new IntWritable((int) totalOps));
|
||||
output.collect(ELAPSED_TIME, new IntWritable((int) totalTime));
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public void getArgsFromConfiguration(Configuration conf) {
|
||||
|
||||
maxDelayBetweenOps = conf.getInt(LG_MAXDELAYBETWEENOPS,
|
||||
maxDelayBetweenOps);
|
||||
numOfThreads = conf.getInt(LG_NUMOFTHREADS, numOfThreads);
|
||||
readProbs[0] = Double.parseDouble(conf.get(LG_READPR, readProbs[0] + ""));
|
||||
writeProbs[0] = Double.parseDouble(conf.get(LG_WRITEPR, writeProbs[0]
|
||||
+ ""));
|
||||
seed = conf.getLong(LG_SEED, seed);
|
||||
numMapTasks = conf.getInt(LG_NUMMAPTASKS, numMapTasks);
|
||||
root = new Path(conf.get(LG_ROOT, root.toString()));
|
||||
durations[0] = conf.getLong(LG_ELAPSEDTIME, 0);
|
||||
startTime = conf.getLong(LG_STARTTIME, 0);
|
||||
scriptFile = conf.get(LG_SCRIPTFILE, null);
|
||||
flagFile = new Path(conf.get(LG_FLAGFILE, FLAGFILE_DEFAULT));
|
||||
if (durations[0] > 0 && scriptFile != null) {
|
||||
System.err.println("Cannot specify both ElapsedTime and ScriptFile, exiting");
|
||||
System.exit(-1);
|
||||
}
|
||||
|
||||
try {
|
||||
if (scriptFile != null && loadScriptFile(scriptFile, false) < 0) {
|
||||
System.err.println("Error in scriptFile, exiting");
|
||||
System.exit(-1);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
System.err.println("Error loading script file " + scriptFile);
|
||||
e.printStackTrace();
|
||||
}
|
||||
if (durations[0] <= 0) {
|
||||
System.err.println("A duration of zero or less is not allowed when running via MapReduce.");
|
||||
System.exit(-1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class ReducerThatCollectsLGdata extends MapReduceBase implements
|
||||
Reducer<Text, IntWritable, Text, IntWritable> {
|
||||
private IntWritable result = new IntWritable();
|
||||
private JobConf jobConf;
|
||||
|
||||
@Override
|
||||
public void configure(JobConf job) {
|
||||
this.jobConf = job;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reduce(Text key, Iterator<IntWritable> values,
|
||||
OutputCollector<Text, IntWritable> output, Reporter reporter)
|
||||
throws IOException {
|
||||
int sum = 0;
|
||||
while (values.hasNext()) {
|
||||
sum += values.next().get();
|
||||
}
|
||||
if (key.equals(OPEN_EXECTIME)){
|
||||
executionTime[OPEN] = sum;
|
||||
} else if (key.equals(NUMOPS_OPEN)){
|
||||
numOfOps[OPEN] = sum;
|
||||
} else if (key.equals(LIST_EXECTIME)){
|
||||
executionTime[LIST] = sum;
|
||||
} else if (key.equals(NUMOPS_LIST)){
|
||||
numOfOps[LIST] = sum;
|
||||
} else if (key.equals(DELETE_EXECTIME)){
|
||||
executionTime[DELETE] = sum;
|
||||
} else if (key.equals(NUMOPS_DELETE)){
|
||||
numOfOps[DELETE] = sum;
|
||||
} else if (key.equals(CREATE_EXECTIME)){
|
||||
executionTime[CREATE] = sum;
|
||||
} else if (key.equals(NUMOPS_CREATE)){
|
||||
numOfOps[CREATE] = sum;
|
||||
} else if (key.equals(WRITE_CLOSE_EXECTIME)){
|
||||
System.out.println(WRITE_CLOSE_EXECTIME + " = " + sum);
|
||||
executionTime[WRITE_CLOSE]= sum;
|
||||
} else if (key.equals(NUMOPS_WRITE_CLOSE)){
|
||||
numOfOps[WRITE_CLOSE] = sum;
|
||||
} else if (key.equals(TOTALOPS)){
|
||||
totalOps = sum;
|
||||
} else if (key.equals(ELAPSED_TIME)){
|
||||
totalTime = sum;
|
||||
}
|
||||
result.set(sum);
|
||||
output.collect(key, result);
|
||||
// System.out.println("Key = " + key + " Sum is =" + sum);
|
||||
// printResults(System.out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
// Output the result to a file Results in the output dir
|
||||
FileContext fc;
|
||||
try {
|
||||
fc = FileContext.getFileContext(jobConf);
|
||||
} catch (IOException ioe) {
|
||||
System.err.println("Can not initialize the file system: " +
|
||||
ioe.getLocalizedMessage());
|
||||
return;
|
||||
}
|
||||
FSDataOutputStream o = fc.create(FileOutputFormat.getTaskOutputPath(jobConf, "Results"),
|
||||
EnumSet.of(CreateFlag.CREATE));
|
||||
|
||||
PrintStream out = new PrintStream(o);
|
||||
printResults(out);
|
||||
out.close();
|
||||
o.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -41,6 +41,10 @@ import org.apache.hadoop.fs.DFSCIOTest;
|
|||
import org.apache.hadoop.fs.DistributedFSCheck;
|
||||
import org.apache.hadoop.io.FileBench;
|
||||
import org.apache.hadoop.fs.JHLogAnalyzer;
|
||||
import org.apache.hadoop.fs.loadGenerator.DataGenerator;
|
||||
import org.apache.hadoop.fs.loadGenerator.LoadGenerator;
|
||||
import org.apache.hadoop.fs.loadGenerator.LoadGeneratorMR;
|
||||
import org.apache.hadoop.fs.loadGenerator.StructureGenerator;
|
||||
import org.apache.hadoop.fs.slive.SliveTest;
|
||||
|
||||
/**
|
||||
|
@ -107,6 +111,14 @@ public class MapredTestDriver {
|
|||
"Single process HDFS and MR cluster.");
|
||||
pgd.addClass("largesorter", LargeSorter.class,
|
||||
"Large-Sort tester");
|
||||
pgd.addClass("NNloadGenerator", LoadGenerator.class,
|
||||
"Generate load on Namenode using NN loadgenerator run WITHOUT MR");
|
||||
pgd.addClass("NNloadGeneratorMR", LoadGeneratorMR.class,
|
||||
"Generate load on Namenode using NN loadgenerator run as MR job");
|
||||
pgd.addClass("NNstructureGenerator", StructureGenerator.class,
|
||||
"Generate the structure to be used by NNdataGenerator");
|
||||
pgd.addClass("NNdataGenerator", DataGenerator.class,
|
||||
"Generate the data to be used by NNloadGenerator");
|
||||
} catch(Throwable e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue