MAPREDUCE-6663. [NNBench] Refactor nnbench as a Tool implementation. Contributed by Brahma Reddy Battula.

This commit is contained in:
Akira Ajisaka 2016-03-30 11:42:54 +09:00
parent 8286270466
commit 690d8a368d
2 changed files with 210 additions and 111 deletions

View File

@ -25,7 +25,6 @@ import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.PrintStream; import java.io.PrintStream;
import java.net.InetAddress;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
@ -33,6 +32,7 @@ import java.util.StringTokenizer;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@ -43,6 +43,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.FileOutputFormat;
@ -54,6 +55,8 @@ import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/** /**
* This program executes a specified operation that applies load to * This program executes a specified operation that applies load to
@ -74,49 +77,48 @@ import org.apache.hadoop.mapred.SequenceFileInputFormat;
* must be run before running the other operations. * must be run before running the other operations.
*/ */
public class NNBench { public class NNBench extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog( private static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.hdfs.NNBench"); "org.apache.hadoop.hdfs.NNBench");
protected static String CONTROL_DIR_NAME = "control"; private static String CONTROL_DIR_NAME = "control";
protected static String OUTPUT_DIR_NAME = "output"; private static String OUTPUT_DIR_NAME = "output";
protected static String DATA_DIR_NAME = "data"; private static String DATA_DIR_NAME = "data";
protected static final String DEFAULT_RES_FILE_NAME = "NNBench_results.log"; static final String DEFAULT_RES_FILE_NAME = "NNBench_results.log";
protected static final String NNBENCH_VERSION = "NameNode Benchmark 0.4"; private static final String NNBENCH_VERSION = "NameNode Benchmark 0.4";
public static String operation = "none"; private String operation = "none";
public static long numberOfMaps = 1l; // default is 1 private long numberOfMaps = 1l; // default is 1
public static long numberOfReduces = 1l; // default is 1 private long numberOfReduces = 1l; // default is 1
public static long startTime = private long startTime =
System.currentTimeMillis() + (120 * 1000); // default is 'now' + 2min System.currentTimeMillis() + (120 * 1000); // default is 'now' + 2min
public static long blockSize = 1l; // default is 1 private long blockSize = 1l; // default is 1
public static int bytesToWrite = 0; // default is 0 private int bytesToWrite = 0; // default is 0
public static long bytesPerChecksum = 1l; // default is 1 private long bytesPerChecksum = 1l; // default is 1
public static long numberOfFiles = 1l; // default is 1 private long numberOfFiles = 1l; // default is 1
public static short replicationFactorPerFile = 1; // default is 1 private short replicationFactorPerFile = 1; // default is 1
public static String baseDir = "/benchmarks/NNBench"; // default private String baseDir = "/benchmarks/NNBench"; // default
public static boolean readFileAfterOpen = false; // default is to not read private boolean readFileAfterOpen = false; // default is to not read
private boolean isHelpMessage = false;
// Supported operations // Supported operations
private static final String OP_CREATE_WRITE = "create_write"; private static final String OP_CREATE_WRITE = "create_write";
private static final String OP_OPEN_READ = "open_read"; private static final String OP_OPEN_READ = "open_read";
private static final String OP_RENAME = "rename"; private static final String OP_RENAME = "rename";
private static final String OP_DELETE = "delete"; private static final String OP_DELETE = "delete";
private static final int MAX_OPERATION_EXCEPTIONS = 1000;
// To display in the format that matches the NN and DN log format // To display in the format that matches the NN and DN log format
// Example: 2007-10-26 00:01:19,853 // Example: 2007-10-26 00:01:19,853
static SimpleDateFormat sdf = static SimpleDateFormat sdf =
new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss','S"); new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss','S");
private static Configuration config = new Configuration();
/** /**
* Clean up the files before a test run * Clean up the files before a test run
* *
* @throws IOException on error * @throws IOException on error
*/ */
private static void cleanupBeforeTestrun() throws IOException { private void cleanupBeforeTestrun() throws IOException {
FileSystem tempFS = FileSystem.get(config); FileSystem tempFS = FileSystem.get(getConf());
// Delete the data directory only if it is the create/write operation // Delete the data directory only if it is the create/write operation
if (operation.equals(OP_CREATE_WRITE)) { if (operation.equals(OP_CREATE_WRITE)) {
@ -133,8 +135,7 @@ public class NNBench {
* *
* @throws IOException on error * @throws IOException on error
*/ */
private static void createControlFiles() throws IOException { private void createControlFiles() throws IOException {
FileSystem tempFS = FileSystem.get(config);
LOG.info("Creating " + numberOfMaps + " control files"); LOG.info("Creating " + numberOfMaps + " control files");
for (int i = 0; i < numberOfMaps; i++) { for (int i = 0; i < numberOfMaps; i++) {
@ -144,8 +145,9 @@ public class NNBench {
SequenceFile.Writer writer = null; SequenceFile.Writer writer = null;
try { try {
writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class, writer = SequenceFile.createWriter(getConf(), Writer.file(filePath),
LongWritable.class, CompressionType.NONE); Writer.keyClass(Text.class), Writer.valueClass(LongWritable.class),
Writer.compression(CompressionType.NONE));
writer.append(new Text(strFileName), new LongWritable(i)); writer.append(new Text(strFileName), new LongWritable(i));
} finally { } finally {
if (writer != null) { if (writer != null) {
@ -208,23 +210,23 @@ public class NNBench {
* line's arguments * line's arguments
* @param length total number of arguments * @param length total number of arguments
*/ */
public static void checkArgs(final int index, final int length) { private static void checkArgs(final int index, final int length) {
if (index == length) { if (index == length) {
displayUsage(); displayUsage();
System.exit(-1); throw new HadoopIllegalArgumentException("Not enough arguments");
} }
} }
/** /**
* Parse input arguments * Parse input arguments
*
* @param args array of command line's parameters to be parsed * @param args array of command line's parameters to be parsed
*
*/ */
public static void parseInputs(final String[] args) { private void parseInputs(final String[] args) {
// If there are no command line arguments, exit // If there are no command line arguments, exit
if (args.length == 0) { if (args.length == 0) {
displayUsage(); displayUsage();
System.exit(-1); throw new HadoopIllegalArgumentException("Give valid inputs");
} }
// Parse command line args // Parse command line args
@ -263,7 +265,7 @@ public class NNBench {
readFileAfterOpen = Boolean.parseBoolean(args[++i]); readFileAfterOpen = Boolean.parseBoolean(args[++i]);
} else if (args[i].equals("-help")) { } else if (args[i].equals("-help")) {
displayUsage(); displayUsage();
System.exit(-1); isHelpMessage = true;
} }
} }
@ -281,31 +283,30 @@ public class NNBench {
LOG.info(" Read file after open: " + readFileAfterOpen); LOG.info(" Read file after open: " + readFileAfterOpen);
// Set user-defined parameters, so the map method can access the values // Set user-defined parameters, so the map method can access the values
config.set("test.nnbench.operation", operation); getConf().set("test.nnbench.operation", operation);
config.setLong("test.nnbench.maps", numberOfMaps); getConf().setLong("test.nnbench.maps", numberOfMaps);
config.setLong("test.nnbench.reduces", numberOfReduces); getConf().setLong("test.nnbench.reduces", numberOfReduces);
config.setLong("test.nnbench.starttime", startTime); getConf().setLong("test.nnbench.starttime", startTime);
config.setLong("test.nnbench.blocksize", blockSize); getConf().setLong("test.nnbench.blocksize", blockSize);
config.setInt("test.nnbench.bytestowrite", bytesToWrite); getConf().setInt("test.nnbench.bytestowrite", bytesToWrite);
config.setLong("test.nnbench.bytesperchecksum", bytesPerChecksum); getConf().setLong("test.nnbench.bytesperchecksum", bytesPerChecksum);
config.setLong("test.nnbench.numberoffiles", numberOfFiles); getConf().setLong("test.nnbench.numberoffiles", numberOfFiles);
config.setInt("test.nnbench.replicationfactor", getConf().setInt("test.nnbench.replicationfactor",
(int) replicationFactorPerFile); (int) replicationFactorPerFile);
config.set("test.nnbench.basedir", baseDir); getConf().set("test.nnbench.basedir", baseDir);
config.setBoolean("test.nnbench.readFileAfterOpen", readFileAfterOpen); getConf().setBoolean("test.nnbench.readFileAfterOpen", readFileAfterOpen);
config.set("test.nnbench.datadir.name", DATA_DIR_NAME); getConf().set("test.nnbench.datadir.name", DATA_DIR_NAME);
config.set("test.nnbench.outputdir.name", OUTPUT_DIR_NAME); getConf().set("test.nnbench.outputdir.name", OUTPUT_DIR_NAME);
config.set("test.nnbench.controldir.name", CONTROL_DIR_NAME); getConf().set("test.nnbench.controldir.name", CONTROL_DIR_NAME);
} }
/** /**
* Analyze the results * Analyze the results
*
* @throws IOException on error * @throws IOException on error
*/ */
private static void analyzeResults() throws IOException { private int analyzeResults() throws IOException {
final FileSystem fs = FileSystem.get(config); final FileSystem fs = FileSystem.get(getConf());
Path reduceDir = new Path(baseDir, OUTPUT_DIR_NAME); Path reduceDir = new Path(baseDir, OUTPUT_DIR_NAME);
long totalTimeAL1 = 0l; long totalTimeAL1 = 0l;
@ -322,11 +323,9 @@ public class NNBench {
for (FileStatus status : fss) { for (FileStatus status : fss) {
Path reduceFile = status.getPath(); Path reduceFile = status.getPath();
DataInputStream in; try (DataInputStream in = new DataInputStream(fs.open(reduceFile));
in = new DataInputStream(fs.open(reduceFile)); BufferedReader lines =
new BufferedReader(new InputStreamReader(in))) {
BufferedReader lines;
lines = new BufferedReader(new InputStreamReader(in));
String line; String line;
while ((line = lines.readLine()) != null) { while ((line = lines.readLine()) != null) {
@ -351,6 +350,7 @@ public class NNBench {
} }
} }
} }
}
// Average latency is the average time to perform 'n' number of // Average latency is the average time to perform 'n' number of
// operations, n being the number of files // operations, n being the number of files
@ -444,25 +444,29 @@ public class NNBench {
" RAW DATA: # of exceptions: " + numOfExceptions, " RAW DATA: # of exceptions: " + numOfExceptions,
"" }; "" };
PrintStream res = new PrintStream(new FileOutputStream( try (PrintStream res = new PrintStream(
new File(DEFAULT_RES_FILE_NAME), true)); new FileOutputStream(new File(DEFAULT_RES_FILE_NAME), true))) {
// Write to a file and also dump to log // Write to a file and also dump to log
for(int i = 0; i < resultLines.length; i++) { for (String resultLine : resultLines) {
LOG.info(resultLines[i]); LOG.info(resultLine);
res.println(resultLines[i]); res.println(resultLine);
} }
} }
if(numOfExceptions >= MAX_OPERATION_EXCEPTIONS){
return -1;
}
return 0;
}
/** /**
* Run the test * Run the test
* *
* @throws IOException on error * @throws IOException on error
*/ */
public static void runTests() throws IOException { private void runTests() throws IOException {
config.setLong("io.bytes.per.checksum", bytesPerChecksum); getConf().setLong("io.bytes.per.checksum", bytesPerChecksum);
JobConf job = new JobConf(config, NNBench.class); JobConf job = new JobConf(getConf(), NNBench.class);
job.setJobName("NNBench-" + operation); job.setJobName("NNBench-" + operation);
FileInputFormat.setInputPaths(job, new Path(baseDir, CONTROL_DIR_NAME)); FileInputFormat.setInputPaths(job, new Path(baseDir, CONTROL_DIR_NAME));
@ -487,7 +491,7 @@ public class NNBench {
/** /**
* Validate the inputs * Validate the inputs
*/ */
public static void validateInputs() { private void validateInputs() {
// If it is not one of the four operations, then fail // If it is not one of the four operations, then fail
if (!operation.equals(OP_CREATE_WRITE) && if (!operation.equals(OP_CREATE_WRITE) &&
!operation.equals(OP_OPEN_READ) && !operation.equals(OP_OPEN_READ) &&
@ -495,7 +499,8 @@ public class NNBench {
!operation.equals(OP_DELETE)) { !operation.equals(OP_DELETE)) {
System.err.println("Error: Unknown operation: " + operation); System.err.println("Error: Unknown operation: " + operation);
displayUsage(); displayUsage();
System.exit(-1); throw new HadoopIllegalArgumentException(
"Error: Unknown operation: " + operation);
} }
// If number of maps is a negative number, then fail // If number of maps is a negative number, then fail
@ -503,57 +508,66 @@ public class NNBench {
if (numberOfMaps < 0) { if (numberOfMaps < 0) {
System.err.println("Error: Number of maps must be a positive number"); System.err.println("Error: Number of maps must be a positive number");
displayUsage(); displayUsage();
System.exit(-1); throw new HadoopIllegalArgumentException(
"Error: Number of maps must be a positive number");
} }
// If number of reduces is a negative number or 0, then fail // If number of reduces is a negative number or 0, then fail
if (numberOfReduces <= 0) { if (numberOfReduces <= 0) {
System.err.println("Error: Number of reduces must be a positive number"); System.err.println("Error: Number of reduces must be a positive number");
displayUsage(); displayUsage();
System.exit(-1); throw new HadoopIllegalArgumentException(
"Error: Number of reduces must be a positive number");
} }
// If blocksize is a negative number or 0, then fail // If blocksize is a negative number or 0, then fail
if (blockSize <= 0) { if (blockSize <= 0) {
System.err.println("Error: Block size must be a positive number"); System.err.println("Error: Block size must be a positive number");
displayUsage(); displayUsage();
System.exit(-1); throw new HadoopIllegalArgumentException(
"Error: Block size must be a positive number");
} }
// If bytes to write is a negative number, then fail // If bytes to write is a negative number, then fail
if (bytesToWrite < 0) { if (bytesToWrite < 0) {
System.err.println("Error: Bytes to write must be a positive number"); System.err.println("Error: Bytes to write must be a positive number");
displayUsage(); displayUsage();
System.exit(-1); throw new HadoopIllegalArgumentException(
"Error: Bytes to write must be a positive number");
} }
// If bytes per checksum is a negative number, then fail // If bytes per checksum is a negative number, then fail
if (bytesPerChecksum < 0) { if (bytesPerChecksum < 0) {
System.err.println("Error: Bytes per checksum must be a positive number"); System.err.println("Error: Bytes per checksum must be a positive number");
displayUsage(); displayUsage();
System.exit(-1); throw new HadoopIllegalArgumentException(
"Error: Bytes per checksum must be a positive number");
} }
// If number of files is a negative number, then fail // If number of files is a negative number, then fail
if (numberOfFiles < 0) { if (numberOfFiles < 0) {
System.err.println("Error: Number of files must be a positive number"); System.err.println("Error: Number of files must be a positive number");
displayUsage(); displayUsage();
System.exit(-1); throw new HadoopIllegalArgumentException(
"Error: Number of files must be a positive number");
} }
// If replication factor is a negative number, then fail // If replication factor is a negative number, then fail
if (replicationFactorPerFile < 0) { if (replicationFactorPerFile < 0) {
System.err.println("Error: Replication factor must be a positive number"); System.err.println("Error: Replication factor must be a positive number");
displayUsage(); displayUsage();
System.exit(-1); throw new HadoopIllegalArgumentException(
"Error: Replication factor must be a positive number");
} }
// If block size is not a multiple of bytesperchecksum, fail // If block size is not a multiple of bytesperchecksum, fail
if (blockSize % bytesPerChecksum != 0) { if (blockSize % bytesPerChecksum != 0) {
System.err.println("Error: Block Size in bytes must be a multiple of " + System.err.println("Error: Block Size in bytes must be a multiple of "
"bytes per checksum: "); + "bytes per checksum: ");
displayUsage(); displayUsage();
System.exit(-1); throw new HadoopIllegalArgumentException(
"Error: Block Size in bytes must be a multiple of "
+ "bytes per checksum:");
} }
} }
/** /**
@ -562,12 +576,21 @@ public class NNBench {
* @param args array of command line arguments * @param args array of command line arguments
* @throws IOException indicates a problem with test startup * @throws IOException indicates a problem with test startup
*/ */
public static void main(String[] args) throws IOException { public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new NNBench(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
// Display the application version string // Display the application version string
displayVersion(); displayVersion();
// Parse the inputs // Parse the inputs
parseInputs(args); parseInputs(args);
if (isHelpMessage) {
return 0;
}
// Validate inputs // Validate inputs
validateInputs(); validateInputs();
@ -582,7 +605,7 @@ public class NNBench {
runTests(); runTests();
// Analyze results // Analyze results
analyzeResults(); return analyzeResults();
} }
@ -592,7 +615,6 @@ public class NNBench {
static class NNBenchMapper extends Configured static class NNBenchMapper extends Configured
implements Mapper<Text, LongWritable, Text, Text> { implements Mapper<Text, LongWritable, Text, Text> {
FileSystem filesystem = null; FileSystem filesystem = null;
private String hostName = null;
long numberOfFiles = 1l; long numberOfFiles = 1l;
long blkSize = 1l; long blkSize = 1l;
@ -602,7 +624,6 @@ public class NNBench {
String dataDirName = null; String dataDirName = null;
String op = null; String op = null;
boolean readFile = false; boolean readFile = false;
final int MAX_OPERATION_EXCEPTIONS = 1000;
// Data to collect from the operation // Data to collect from the operation
int numOfExceptions = 0; int numOfExceptions = 0;
@ -628,12 +649,6 @@ public class NNBench {
} catch(Exception e) { } catch(Exception e) {
throw new RuntimeException("Cannot get file system.", e); throw new RuntimeException("Cannot get file system.", e);
} }
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch(Exception e) {
throw new RuntimeException("Error getting hostname", e);
}
} }
/** /**
@ -678,7 +693,7 @@ public class NNBench {
LongWritable value, LongWritable value,
OutputCollector<Text, Text> output, OutputCollector<Text, Text> output,
Reporter reporter) throws IOException { Reporter reporter) throws IOException {
Configuration conf = filesystem.getConf(); Configuration conf = getConf();
numberOfFiles = conf.getLong("test.nnbench.numberoffiles", 1l); numberOfFiles = conf.getLong("test.nnbench.numberoffiles", 1l);
blkSize = conf.getLong("test.nnbench.blocksize", 1l); blkSize = conf.getLong("test.nnbench.blocksize", 1l);

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.HadoopTestCase;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.Test;
public class TestNNBench extends HadoopTestCase {
private static final String BASE_DIR =
new File(System.getProperty("test.build.data", "build/test/data"),
"NNBench").getAbsolutePath();
public TestNNBench() throws IOException {
super(LOCAL_MR, LOCAL_FS, 1, 1);
}
@After
public void tearDown() throws Exception {
getFileSystem().delete(new Path(BASE_DIR), true);
getFileSystem().delete(new Path(NNBench.DEFAULT_RES_FILE_NAME), true);
super.tearDown();
}
@Test(timeout = 30000)
public void testNNBenchCreateReadAndDelete() throws Exception {
runNNBench(createJobConf(), "create_write");
Path path = new Path(BASE_DIR + "/data/file_0_0");
assertTrue("create_write should create the file",
getFileSystem().exists(path));
runNNBench(createJobConf(), "open_read");
runNNBench(createJobConf(), "delete");
assertFalse("Delete operation should delete the file",
getFileSystem().exists(path));
}
@Test(timeout = 30000)
public void testNNBenchCreateAndRename() throws Exception {
runNNBench(createJobConf(), "create_write");
Path path = new Path(BASE_DIR + "/data/file_0_0");
assertTrue("create_write should create the file",
getFileSystem().exists(path));
runNNBench(createJobConf(), "rename");
Path renamedPath = new Path(BASE_DIR + "/data/file_0_r_0");
assertFalse("Rename should rename the file", getFileSystem().exists(path));
assertTrue("Rename should rename the file",
getFileSystem().exists(renamedPath));
}
private void runNNBench(Configuration conf, String operation)
throws Exception {
String[] genArgs = { "-operation", operation, "-baseDir", BASE_DIR,
"-startTime", "" + (Time.now() / 1000 + 3) };
assertEquals(0, ToolRunner.run(conf, new NNBench(), genArgs));
}
}