MAPREDUCE-4651. Benchmarking random reads with DFSIO. Contributed by Konstantin Shvachko.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1390161 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Konstantin Shvachko 2012-09-25 21:36:30 +00:00
parent ac92399440
commit 1aba4503cf
3 changed files with 421 additions and 183 deletions

View File

@ -49,9 +49,6 @@ Release 2.0.2-alpha - 2012-09-07
MAPREDUCE-4408. allow jobs to set a JAR that is in the distributed cached MAPREDUCE-4408. allow jobs to set a JAR that is in the distributed cached
(rkanter via tucu) (rkanter via tucu)
MAPREDUCE-2786. Add compression option for TestDFSIO.
(Plamen Jeliazkov via shv)
MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal
interface to allow schedulers to maintain their own. (acmurthy) interface to allow schedulers to maintain their own. (acmurthy)
@ -409,9 +406,14 @@ Release 0.23.4 - UNRELEASED
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-2786. Add compression option for TestDFSIO.
(Plamen Jeliazkov via shv)
MAPREDUCE-4645. Provide a random seed to Slive to make the sequence MAPREDUCE-4645. Provide a random seed to Slive to make the sequence
of file names deterministic. (Ravi Prakash via shv) of file names deterministic. (Ravi Prakash via shv)
MAPREDUCE-4651. Benchmarking random reads with DFSIO. (shv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -17,14 +17,13 @@
*/ */
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.ReflectionUtils;
/** /**
* Base mapper class for IO operations. * Base mapper class for IO operations.
@ -35,7 +34,6 @@
* statistics data to be collected by subsequent reducers. * statistics data to be collected by subsequent reducers.
* *
*/ */
@SuppressWarnings("deprecation")
public abstract class IOMapperBase<T> extends Configured public abstract class IOMapperBase<T> extends Configured
implements Mapper<Text, LongWritable, Text, Text> { implements Mapper<Text, LongWritable, Text, Text> {
@ -43,7 +41,7 @@ public abstract class IOMapperBase<T> extends Configured
protected int bufferSize; protected int bufferSize;
protected FileSystem fs; protected FileSystem fs;
protected String hostName; protected String hostName;
protected CompressionCodec compressionCodec; protected Closeable stream;
public IOMapperBase() { public IOMapperBase() {
} }
@ -62,22 +60,6 @@ public void configure(JobConf conf) {
} catch(Exception e) { } catch(Exception e) {
hostName = "localhost"; hostName = "localhost";
} }
//grab compression
String compression = getConf().get("test.io.compression.class", null);
Class<? extends CompressionCodec> codec;
//try to initialize codec
try {
codec = (compression == null) ? null :
Class.forName(compression).asSubclass(CompressionCodec.class);
} catch(Exception e) {
throw new RuntimeException("Compression codec not found: ", e);
}
if(codec != null) {
compressionCodec = (CompressionCodec) ReflectionUtils.newInstance(codec, getConf());
}
} }
public void close() throws IOException { public void close() throws IOException {
@ -97,6 +79,18 @@ abstract T doIO(Reporter reporter,
String name, String name,
long value) throws IOException; long value) throws IOException;
/**
* Create an input or output stream based on the specified file.
* Subclasses should override this method to provide an actual stream.
*
* @param name file name
* @return the stream
* @throws IOException
*/
public Closeable getIOStream(String name) throws IOException {
return null;
}
/** /**
* Collect stat data to be combined by a subsequent reducer. * Collect stat data to be combined by a subsequent reducer.
* *
@ -132,9 +126,15 @@ public void map(Text key,
long longValue = value.get(); long longValue = value.get();
reporter.setStatus("starting " + name + " ::host = " + hostName); reporter.setStatus("starting " + name + " ::host = " + hostName);
this.stream = getIOStream(name);
T statValue = null;
long tStart = System.currentTimeMillis(); long tStart = System.currentTimeMillis();
T statValue = doIO(reporter, name, longValue); try {
statValue = doIO(reporter, name, longValue);
} finally {
if(stream != null) stream.close();
}
long tEnd = System.currentTimeMillis(); long tEnd = System.currentTimeMillis();
long execTime = tEnd - tStart; long execTime = tEnd - tStart;
collectStats(output, name, execTime, statValue); collectStats(output, name, execTime, statValue);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.Closeable;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
@ -28,10 +29,9 @@
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.Date; import java.util.Date;
import java.util.Random;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import junit.framework.TestCase;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -39,18 +39,30 @@
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.*; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/** /**
* Distributed i/o benchmark. * Distributed i/o benchmark.
* <p> * <p>
* This test writes into or reads from a specified number of files. * This test writes into or reads from a specified number of files.
* File size is specified as a parameter to the test. * Number of bytes to write or read is specified as a parameter to the test.
* Each file is accessed in a separate map task. * Each file is accessed in a separate map task.
* <p> * <p>
* The reducer collects the following statistics: * The reducer collects the following statistics:
@ -73,24 +85,24 @@
* <li>standard deviation of i/o rate </li> * <li>standard deviation of i/o rate </li>
* </ul> * </ul>
*/ */
public class TestDFSIO extends TestCase implements Tool { public class TestDFSIO implements Tool {
// Constants // Constants
private static final Log LOG = LogFactory.getLog(TestDFSIO.class); private static final Log LOG = LogFactory.getLog(TestDFSIO.class);
private static final int TEST_TYPE_READ = 0;
private static final int TEST_TYPE_WRITE = 1;
private static final int TEST_TYPE_CLEANUP = 2;
private static final int TEST_TYPE_APPEND = 3;
private static final int DEFAULT_BUFFER_SIZE = 1000000; private static final int DEFAULT_BUFFER_SIZE = 1000000;
private static final String BASE_FILE_NAME = "test_io_"; private static final String BASE_FILE_NAME = "test_io_";
private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log"; private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
private static final long MEGA = ByteMultiple.MB.value(); private static final long MEGA = ByteMultiple.MB.value();
private static final int DEFAULT_NR_BYTES = 1;
private static final int DEFAULT_NR_FILES = 4;
private static final String USAGE = private static final String USAGE =
"Usage: " + TestDFSIO.class.getSimpleName() + "Usage: " + TestDFSIO.class.getSimpleName() +
" [genericOptions]" + " [genericOptions]" +
" -read | -write | -append | -clean [-nrFiles N]" + " -read [-random | -backward | -skip [-skipSize Size]] |" +
" [-fileSize Size[B|KB|MB|GB|TB]]" + " -write | -append | -clean" +
" [-resFile resultFileName] [-bufferSize Bytes]" + " [-nrFiles N]" +
" [-rootDir]"; " [-size Size[B|KB|MB|GB|TB]]" +
" [-resFile resultFileName] [-bufferSize Bytes]" +
" [-rootDir]";
private Configuration config; private Configuration config;
@ -101,6 +113,27 @@ public class TestDFSIO extends TestCase implements Tool {
Configuration.addDefaultResource("mapred-site.xml"); Configuration.addDefaultResource("mapred-site.xml");
} }
private static enum TestType {
TEST_TYPE_READ("read"),
TEST_TYPE_WRITE("write"),
TEST_TYPE_CLEANUP("cleanup"),
TEST_TYPE_APPEND("append"),
TEST_TYPE_READ_RANDOM("random read"),
TEST_TYPE_READ_BACKWARD("backward read"),
TEST_TYPE_READ_SKIP("skip read");
private String type;
private TestType(String t) {
type = t;
}
@Override // String
public String toString() {
return type;
}
}
static enum ByteMultiple { static enum ByteMultiple {
B(1L), B(1L),
KB(0x400L), KB(0x400L),
@ -155,62 +188,100 @@ private static Path getReadDir(Configuration conf) {
private static Path getAppendDir(Configuration conf) { private static Path getAppendDir(Configuration conf) {
return new Path(getBaseDir(conf), "io_append"); return new Path(getBaseDir(conf), "io_append");
} }
private static Path getRandomReadDir(Configuration conf) {
return new Path(getBaseDir(conf), "io_random_read");
}
private static Path getDataDir(Configuration conf) { private static Path getDataDir(Configuration conf) {
return new Path(getBaseDir(conf), "io_data"); return new Path(getBaseDir(conf), "io_data");
} }
/** private static MiniDFSCluster cluster;
* Run the test with default parameters. private static TestDFSIO bench;
*
* @throws Exception @BeforeClass
*/ public static void beforeClass() throws Exception {
public void testIOs() throws Exception { bench = new TestDFSIO();
TestDFSIO bench = new TestDFSIO(); bench.getConf().setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
bench.testIOs(1, 4); cluster = new MiniDFSCluster.Builder(bench.getConf())
.numDataNodes(2)
.format(true)
.build();
FileSystem fs = cluster.getFileSystem();
bench.createControlFile(fs, DEFAULT_NR_BYTES, DEFAULT_NR_FILES);
} }
/** @AfterClass
* Run the test with the specified parameters. public static void afterClass() throws Exception {
* if(cluster == null)
* @param fileSize file size return;
* @param nrFiles number of files FileSystem fs = cluster.getFileSystem();
* @throws IOException bench.cleanup(fs);
*/ cluster.shutdown();
public void testIOs(int fileSize, int nrFiles)
throws IOException {
config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster(config, 2, true, null);
FileSystem fs = cluster.getFileSystem();
createControlFile(fs, fileSize, nrFiles);
long tStart = System.currentTimeMillis();
writeTest(fs);
long execTime = System.currentTimeMillis() - tStart;
analyzeResult(fs, TEST_TYPE_WRITE, execTime, DEFAULT_RES_FILE_NAME);
tStart = System.currentTimeMillis();
readTest(fs);
execTime = System.currentTimeMillis() - tStart;
analyzeResult(fs, TEST_TYPE_READ, execTime, DEFAULT_RES_FILE_NAME);
tStart = System.currentTimeMillis();
appendTest(fs);
execTime = System.currentTimeMillis() - tStart;
analyzeResult(fs, TEST_TYPE_APPEND, execTime, DEFAULT_RES_FILE_NAME);
cleanup(fs);
} finally {
if(cluster != null) cluster.shutdown();
}
} }
@Test
public void testWrite() throws Exception {
FileSystem fs = cluster.getFileSystem();
long tStart = System.currentTimeMillis();
bench.writeTest(fs);
long execTime = System.currentTimeMillis() - tStart;
bench.analyzeResult(fs, TestType.TEST_TYPE_WRITE, execTime);
}
@Test
public void testRead() throws Exception {
FileSystem fs = cluster.getFileSystem();
long tStart = System.currentTimeMillis();
bench.readTest(fs);
long execTime = System.currentTimeMillis() - tStart;
bench.analyzeResult(fs, TestType.TEST_TYPE_READ, execTime);
}
@Test
public void testReadRandom() throws Exception {
FileSystem fs = cluster.getFileSystem();
long tStart = System.currentTimeMillis();
bench.getConf().setLong("test.io.skip.size", 0);
bench.randomReadTest(fs);
long execTime = System.currentTimeMillis() - tStart;
bench.analyzeResult(fs, TestType.TEST_TYPE_READ_RANDOM, execTime);
}
@Test
public void testReadBackward() throws Exception {
FileSystem fs = cluster.getFileSystem();
long tStart = System.currentTimeMillis();
bench.getConf().setLong("test.io.skip.size", -DEFAULT_BUFFER_SIZE);
bench.randomReadTest(fs);
long execTime = System.currentTimeMillis() - tStart;
bench.analyzeResult(fs, TestType.TEST_TYPE_READ_BACKWARD, execTime);
}
@Test
public void testReadSkip() throws Exception {
FileSystem fs = cluster.getFileSystem();
long tStart = System.currentTimeMillis();
bench.getConf().setLong("test.io.skip.size", 1);
bench.randomReadTest(fs);
long execTime = System.currentTimeMillis() - tStart;
bench.analyzeResult(fs, TestType.TEST_TYPE_READ_SKIP, execTime);
}
@Test
public void testAppend() throws Exception {
FileSystem fs = cluster.getFileSystem();
long tStart = System.currentTimeMillis();
bench.appendTest(fs);
long execTime = System.currentTimeMillis() - tStart;
bench.analyzeResult(fs, TestType.TEST_TYPE_APPEND, execTime);
}
@SuppressWarnings("deprecation")
private void createControlFile(FileSystem fs, private void createControlFile(FileSystem fs,
long fileSize, // in bytes long nrBytes, // in bytes
int nrFiles int nrFiles
) throws IOException { ) throws IOException {
LOG.info("creating control file: "+fileSize+" bytes, "+nrFiles+" files"); LOG.info("creating control file: "+nrBytes+" bytes, "+nrFiles+" files");
Path controlDir = getControlDir(config); Path controlDir = getControlDir(config);
fs.delete(controlDir, true); fs.delete(controlDir, true);
@ -223,7 +294,7 @@ private void createControlFile(FileSystem fs,
writer = SequenceFile.createWriter(fs, config, controlFile, writer = SequenceFile.createWriter(fs, config, controlFile,
Text.class, LongWritable.class, Text.class, LongWritable.class,
CompressionType.NONE); CompressionType.NONE);
writer.append(new Text(name), new LongWritable(fileSize)); writer.append(new Text(name), new LongWritable(nrBytes));
} catch(Exception e) { } catch(Exception e) {
throw new IOException(e.getLocalizedMessage()); throw new IOException(e.getLocalizedMessage());
} finally { } finally {
@ -251,10 +322,35 @@ private static String getFileName(int fIdx) {
* <li>i/o rate squared</li> * <li>i/o rate squared</li>
* </ul> * </ul>
*/ */
private abstract static class IOStatMapper<T> extends IOMapperBase<T> { private abstract static class IOStatMapper extends IOMapperBase<Long> {
IOStatMapper() { protected CompressionCodec compressionCodec;
IOStatMapper() {
} }
@Override // Mapper
public void configure(JobConf conf) {
super.configure(conf);
// grab compression
String compression = getConf().get("test.io.compression.class", null);
Class<? extends CompressionCodec> codec;
// try to initialize codec
try {
codec = (compression == null) ? null :
Class.forName(compression).asSubclass(CompressionCodec.class);
} catch(Exception e) {
throw new RuntimeException("Compression codec not found: ", e);
}
if(codec != null) {
compressionCodec = (CompressionCodec)
ReflectionUtils.newInstance(codec, getConf());
}
}
@Override // IOMapperBase
void collectStats(OutputCollector<Text, Text> output, void collectStats(OutputCollector<Text, Text> output,
String name, String name,
long execTime, long execTime,
@ -281,36 +377,38 @@ void collectStats(OutputCollector<Text, Text> output,
/** /**
* Write mapper class. * Write mapper class.
*/ */
public static class WriteMapper extends IOStatMapper<Long> { public static class WriteMapper extends IOStatMapper {
public WriteMapper() { public WriteMapper() {
for(int i=0; i < bufferSize; i++) for(int i=0; i < bufferSize; i++)
buffer[i] = (byte)('0' + i % 50); buffer[i] = (byte)('0' + i % 50);
} }
@Override @Override // IOMapperBase
public Closeable getIOStream(String name) throws IOException {
// create file
OutputStream out =
fs.create(new Path(getDataDir(getConf()), name), true, bufferSize);
if(compressionCodec != null)
out = compressionCodec.createOutputStream(out);
LOG.info("out = " + out.getClass().getName());
return out;
}
@Override // IOMapperBase
public Long doIO(Reporter reporter, public Long doIO(Reporter reporter,
String name, String name,
long totalSize // in bytes long totalSize // in bytes
) throws IOException { ) throws IOException {
// create file OutputStream out = (OutputStream)this.stream;
OutputStream out; // write to the file
out = fs.create(new Path(getDataDir(getConf()), name), true, bufferSize); long nrRemaining;
for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
if(compressionCodec != null) out = compressionCodec.createOutputStream(out); int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
out.write(buffer, 0, curSize);
try { reporter.setStatus("writing " + name + "@" +
// write to the file (totalSize - nrRemaining) + "/" + totalSize
long nrRemaining; + " ::host = " + hostName);
for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
out.write(buffer, 0, curSize);
reporter.setStatus("writing " + name + "@" +
(totalSize - nrRemaining) + "/" + totalSize
+ " ::host = " + hostName);
}
} finally {
out.close();
} }
return Long.valueOf(totalSize); return Long.valueOf(totalSize);
} }
@ -324,7 +422,6 @@ private void writeTest(FileSystem fs) throws IOException {
runIOTest(WriteMapper.class, writeDir); runIOTest(WriteMapper.class, writeDir);
} }
@SuppressWarnings("deprecation")
private void runIOTest( private void runIOTest(
Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass, Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass,
Path outputDir) throws IOException { Path outputDir) throws IOException {
@ -346,35 +443,38 @@ private void runIOTest(
/** /**
* Append mapper class. * Append mapper class.
*/ */
public static class AppendMapper extends IOStatMapper<Long> { public static class AppendMapper extends IOStatMapper {
public AppendMapper() { public AppendMapper() {
for(int i=0; i < bufferSize; i++) for(int i=0; i < bufferSize; i++)
buffer[i] = (byte)('0' + i % 50); buffer[i] = (byte)('0' + i % 50);
} }
@Override // IOMapperBase
public Closeable getIOStream(String name) throws IOException {
// open file for append
OutputStream out =
fs.append(new Path(getDataDir(getConf()), name), bufferSize);
if(compressionCodec != null)
out = compressionCodec.createOutputStream(out);
LOG.info("out = " + out.getClass().getName());
return out;
}
@Override // IOMapperBase
public Long doIO(Reporter reporter, public Long doIO(Reporter reporter,
String name, String name,
long totalSize // in bytes long totalSize // in bytes
) throws IOException { ) throws IOException {
// create file OutputStream out = (OutputStream)this.stream;
OutputStream out; // write to the file
out = fs.append(new Path(getDataDir(getConf()), name), bufferSize); long nrRemaining;
for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
if(compressionCodec != null) out = compressionCodec.createOutputStream(out); int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
out.write(buffer, 0, curSize);
try { reporter.setStatus("writing " + name + "@" +
// write to the file (totalSize - nrRemaining) + "/" + totalSize
long nrRemaining; + " ::host = " + hostName);
for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
out.write(buffer, 0, curSize);
reporter.setStatus("writing " + name + "@" +
(totalSize - nrRemaining) + "/" + totalSize
+ " ::host = " + hostName);
}
} finally {
out.close();
} }
return Long.valueOf(totalSize); return Long.valueOf(totalSize);
} }
@ -389,32 +489,35 @@ private void appendTest(FileSystem fs) throws IOException {
/** /**
* Read mapper class. * Read mapper class.
*/ */
public static class ReadMapper extends IOStatMapper<Long> { public static class ReadMapper extends IOStatMapper {
public ReadMapper() { public ReadMapper() {
} }
@Override // IOMapperBase
public Closeable getIOStream(String name) throws IOException {
// open file
InputStream in = fs.open(new Path(getDataDir(getConf()), name));
if(compressionCodec != null)
in = compressionCodec.createInputStream(in);
LOG.info("in = " + in.getClass().getName());
return in;
}
@Override // IOMapperBase
public Long doIO(Reporter reporter, public Long doIO(Reporter reporter,
String name, String name,
long totalSize // in bytes long totalSize // in bytes
) throws IOException { ) throws IOException {
// open file InputStream in = (InputStream)this.stream;
InputStream in = fs.open(new Path(getDataDir(getConf()), name));
if(compressionCodec != null) in = compressionCodec.createInputStream(in);
long actualSize = 0; long actualSize = 0;
try { while (actualSize < totalSize) {
while (actualSize < totalSize) { int curSize = in.read(buffer, 0, bufferSize);
int curSize = in.read(buffer, 0, bufferSize); if(curSize < 0) break;
if(curSize < 0) break; actualSize += curSize;
actualSize += curSize; reporter.setStatus("reading " + name + "@" +
reporter.setStatus("reading " + name + "@" + actualSize + "/" + totalSize
actualSize + "/" + totalSize + " ::host = " + hostName);
+ " ::host = " + hostName);
}
} finally {
in.close();
} }
return Long.valueOf(actualSize); return Long.valueOf(actualSize);
} }
@ -426,20 +529,111 @@ private void readTest(FileSystem fs) throws IOException {
runIOTest(ReadMapper.class, readDir); runIOTest(ReadMapper.class, readDir);
} }
/**
* Mapper class for random reads.
* The mapper chooses a position in the file and reads bufferSize
* bytes starting at the chosen position.
* It stops after reading the totalSize bytes, specified by -size.
*
* There are three type of reads.
* 1) Random read always chooses a random position to read from: skipSize = 0
* 2) Backward read reads file in reverse order : skipSize < 0
* 3) Skip-read skips skipSize bytes after every read : skipSize > 0
*/
public static class RandomReadMapper extends IOStatMapper {
private Random rnd;
private long fileSize;
private long skipSize;
@Override // Mapper
public void configure(JobConf conf) {
super.configure(conf);
skipSize = conf.getLong("test.io.skip.size", 0);
}
public RandomReadMapper() {
rnd = new Random();
}
@Override // IOMapperBase
public Closeable getIOStream(String name) throws IOException {
Path filePath = new Path(getDataDir(getConf()), name);
this.fileSize = fs.getFileStatus(filePath).getLen();
InputStream in = fs.open(filePath);
if(compressionCodec != null)
in = new FSDataInputStream(compressionCodec.createInputStream(in));
LOG.info("in = " + in.getClass().getName());
LOG.info("skipSize = " + skipSize);
return in;
}
@Override // IOMapperBase
public Long doIO(Reporter reporter,
String name,
long totalSize // in bytes
) throws IOException {
PositionedReadable in = (PositionedReadable)this.stream;
long actualSize = 0;
for(long pos = nextOffset(-1);
actualSize < totalSize; pos = nextOffset(pos)) {
int curSize = in.read(pos, buffer, 0, bufferSize);
if(curSize < 0) break;
actualSize += curSize;
reporter.setStatus("reading " + name + "@" +
actualSize + "/" + totalSize
+ " ::host = " + hostName);
}
return Long.valueOf(actualSize);
}
/**
* Get next offset for reading.
* If current < 0 then choose initial offset according to the read type.
*
* @param current offset
* @return
*/
private long nextOffset(long current) {
if(skipSize == 0)
return rnd.nextInt((int)(fileSize));
if(skipSize > 0)
return (current < 0) ? 0 : (current + bufferSize + skipSize);
// skipSize < 0
return (current < 0) ? Math.max(0, fileSize - bufferSize) :
Math.max(0, current + skipSize);
}
}
private void randomReadTest(FileSystem fs) throws IOException {
Path readDir = getRandomReadDir(config);
fs.delete(readDir, true);
runIOTest(RandomReadMapper.class, readDir);
}
private void sequentialTest(FileSystem fs, private void sequentialTest(FileSystem fs,
int testType, TestType testType,
long fileSize, // in bytes long fileSize, // in bytes
int nrFiles int nrFiles
) throws IOException { ) throws IOException {
IOStatMapper<Long> ioer = null; IOStatMapper ioer = null;
if (testType == TEST_TYPE_READ) switch(testType) {
case TEST_TYPE_READ:
ioer = new ReadMapper(); ioer = new ReadMapper();
else if (testType == TEST_TYPE_WRITE) break;
case TEST_TYPE_WRITE:
ioer = new WriteMapper(); ioer = new WriteMapper();
else if (testType == TEST_TYPE_APPEND) break;
case TEST_TYPE_APPEND:
ioer = new AppendMapper(); ioer = new AppendMapper();
else break;
case TEST_TYPE_READ_RANDOM:
case TEST_TYPE_READ_BACKWARD:
case TEST_TYPE_READ_SKIP:
ioer = new RandomReadMapper();
break;
default:
return; return;
}
for(int i=0; i < nrFiles; i++) for(int i=0; i < nrFiles; i++)
ioer.doIO(Reporter.NULL, ioer.doIO(Reporter.NULL,
BASE_FILE_NAME+Integer.toString(i), BASE_FILE_NAME+Integer.toString(i),
@ -462,14 +656,15 @@ public static void main(String[] args) {
@Override // Tool @Override // Tool
public int run(String[] args) throws IOException { public int run(String[] args) throws IOException {
int testType = TEST_TYPE_READ; TestType testType = null;
int bufferSize = DEFAULT_BUFFER_SIZE; int bufferSize = DEFAULT_BUFFER_SIZE;
long fileSize = 1*MEGA; long nrBytes = 1*MEGA;
int nrFiles = 1; int nrFiles = 1;
long skipSize = 0;
String resFileName = DEFAULT_RES_FILE_NAME; String resFileName = DEFAULT_RES_FILE_NAME;
String compressionClass = null; String compressionClass = null;
boolean isSequential = false; boolean isSequential = false;
String version = TestDFSIO.class.getSimpleName() + ".0.0.6"; String version = TestDFSIO.class.getSimpleName() + ".1.7";
LOG.info(version); LOG.info(version);
if (args.length == 0) { if (args.length == 0) {
@ -479,21 +674,32 @@ public int run(String[] args) throws IOException {
for (int i = 0; i < args.length; i++) { // parse command line for (int i = 0; i < args.length; i++) { // parse command line
if (args[i].startsWith("-read")) { if (args[i].startsWith("-read")) {
testType = TEST_TYPE_READ; testType = TestType.TEST_TYPE_READ;
} else if (args[i].equals("-write")) { } else if (args[i].equals("-write")) {
testType = TEST_TYPE_WRITE; testType = TestType.TEST_TYPE_WRITE;
} else if (args[i].equals("-append")) { } else if (args[i].equals("-append")) {
testType = TEST_TYPE_APPEND; testType = TestType.TEST_TYPE_APPEND;
} else if (args[i].equals("-random")) {
if(testType != TestType.TEST_TYPE_READ) return -1;
testType = TestType.TEST_TYPE_READ_RANDOM;
} else if (args[i].equals("-backward")) {
if(testType != TestType.TEST_TYPE_READ) return -1;
testType = TestType.TEST_TYPE_READ_BACKWARD;
} else if (args[i].equals("-skip")) {
if(testType != TestType.TEST_TYPE_READ) return -1;
testType = TestType.TEST_TYPE_READ_SKIP;
} else if (args[i].equals("-clean")) { } else if (args[i].equals("-clean")) {
testType = TEST_TYPE_CLEANUP; testType = TestType.TEST_TYPE_CLEANUP;
} else if (args[i].startsWith("-seq")) { } else if (args[i].startsWith("-seq")) {
isSequential = true; isSequential = true;
} else if (args[i].startsWith("-compression")) { } else if (args[i].startsWith("-compression")) {
compressionClass = args[++i]; compressionClass = args[++i];
} else if (args[i].equals("-nrFiles")) { } else if (args[i].equals("-nrFiles")) {
nrFiles = Integer.parseInt(args[++i]); nrFiles = Integer.parseInt(args[++i]);
} else if (args[i].equals("-fileSize")) { } else if (args[i].equals("-fileSize") || args[i].equals("-size")) {
fileSize = parseSize(args[++i]); nrBytes = parseSize(args[++i]);
} else if (args[i].equals("-skipSize")) {
skipSize = parseSize(args[++i]);
} else if (args[i].equals("-bufferSize")) { } else if (args[i].equals("-bufferSize")) {
bufferSize = Integer.parseInt(args[++i]); bufferSize = Integer.parseInt(args[++i]);
} else if (args[i].equals("-resFile")) { } else if (args[i].equals("-resFile")) {
@ -503,10 +709,18 @@ public int run(String[] args) throws IOException {
return -1; return -1;
} }
} }
if(testType == null)
return -1;
if(testType == TestType.TEST_TYPE_READ_BACKWARD)
skipSize = -bufferSize;
else if(testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0)
skipSize = bufferSize;
LOG.info("nrFiles = " + nrFiles); LOG.info("nrFiles = " + nrFiles);
LOG.info("fileSize (MB) = " + toMB(fileSize)); LOG.info("nrBytes (MB) = " + toMB(nrBytes));
LOG.info("bufferSize = " + bufferSize); LOG.info("bufferSize = " + bufferSize);
if(skipSize > 0)
LOG.info("skipSize = " + skipSize);
LOG.info("baseDir = " + getBaseDir(config)); LOG.info("baseDir = " + getBaseDir(config));
if(compressionClass != null) { if(compressionClass != null) {
@ -515,29 +729,39 @@ public int run(String[] args) throws IOException {
} }
config.setInt("test.io.file.buffer.size", bufferSize); config.setInt("test.io.file.buffer.size", bufferSize);
config.setLong("test.io.skip.size", skipSize);
config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true); config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
FileSystem fs = FileSystem.get(config); FileSystem fs = FileSystem.get(config);
if (isSequential) { if (isSequential) {
long tStart = System.currentTimeMillis(); long tStart = System.currentTimeMillis();
sequentialTest(fs, testType, fileSize, nrFiles); sequentialTest(fs, testType, nrBytes, nrFiles);
long execTime = System.currentTimeMillis() - tStart; long execTime = System.currentTimeMillis() - tStart;
String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000; String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
LOG.info(resultLine); LOG.info(resultLine);
return 0; return 0;
} }
if (testType == TEST_TYPE_CLEANUP) { if (testType == TestType.TEST_TYPE_CLEANUP) {
cleanup(fs); cleanup(fs);
return 0; return 0;
} }
createControlFile(fs, fileSize, nrFiles); createControlFile(fs, nrBytes, nrFiles);
long tStart = System.currentTimeMillis(); long tStart = System.currentTimeMillis();
if (testType == TEST_TYPE_WRITE) switch(testType) {
case TEST_TYPE_WRITE:
writeTest(fs); writeTest(fs);
if (testType == TEST_TYPE_READ) break;
case TEST_TYPE_READ:
readTest(fs); readTest(fs);
if (testType == TEST_TYPE_APPEND) break;
case TEST_TYPE_APPEND:
appendTest(fs); appendTest(fs);
break;
case TEST_TYPE_READ_RANDOM:
case TEST_TYPE_READ_BACKWARD:
case TEST_TYPE_READ_SKIP:
randomReadTest(fs);
}
long execTime = System.currentTimeMillis() - tStart; long execTime = System.currentTimeMillis() - tStart;
analyzeResult(fs, testType, execTime, resFileName); analyzeResult(fs, testType, execTime, resFileName);
@ -563,9 +787,9 @@ public void setConf(Configuration conf) {
static long parseSize(String arg) { static long parseSize(String arg) {
String[] args = arg.split("\\D", 2); // get digits String[] args = arg.split("\\D", 2); // get digits
assert args.length <= 2; assert args.length <= 2;
long fileSize = Long.parseLong(args[0]); long nrBytes = Long.parseLong(args[0]);
String bytesMult = arg.substring(args[0].length()); // get byte multiple String bytesMult = arg.substring(args[0].length()); // get byte multiple
return fileSize * ByteMultiple.parseString(bytesMult).value(); return nrBytes * ByteMultiple.parseString(bytesMult).value();
} }
static float toMB(long bytes) { static float toMB(long bytes) {
@ -573,17 +797,11 @@ static float toMB(long bytes) {
} }
private void analyzeResult( FileSystem fs, private void analyzeResult( FileSystem fs,
int testType, TestType testType,
long execTime, long execTime,
String resFileName String resFileName
) throws IOException { ) throws IOException {
Path reduceFile; Path reduceFile = getReduceFilePath(testType);
if (testType == TEST_TYPE_WRITE)
reduceFile = new Path(getWriteDir(config), "part-00000");
else if (testType == TEST_TYPE_APPEND)
reduceFile = new Path(getAppendDir(config), "part-00000");
else // if (testType == TEST_TYPE_READ)
reduceFile = new Path(getReadDir(config), "part-00000");
long tasks = 0; long tasks = 0;
long size = 0; long size = 0;
long time = 0; long time = 0;
@ -617,10 +835,7 @@ else if (attr.endsWith(":sqrate"))
double med = rate / 1000 / tasks; double med = rate / 1000 / tasks;
double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med)); double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
String resultLines[] = { String resultLines[] = {
"----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" : "----- TestDFSIO ----- : " + testType,
(testType == TEST_TYPE_READ) ? "read" :
(testType == TEST_TYPE_APPEND) ? "append" :
"unknown"),
" Date & time: " + new Date(System.currentTimeMillis()), " Date & time: " + new Date(System.currentTimeMillis()),
" Number of files: " + tasks, " Number of files: " + tasks,
"Total MBytes processed: " + toMB(size), "Total MBytes processed: " + toMB(size),
@ -642,6 +857,27 @@ else if (attr.endsWith(":sqrate"))
} }
} }
private Path getReduceFilePath(TestType testType) {
switch(testType) {
case TEST_TYPE_WRITE:
return new Path(getWriteDir(config), "part-00000");
case TEST_TYPE_APPEND:
return new Path(getAppendDir(config), "part-00000");
case TEST_TYPE_READ:
return new Path(getReadDir(config), "part-00000");
case TEST_TYPE_READ_RANDOM:
case TEST_TYPE_READ_BACKWARD:
case TEST_TYPE_READ_SKIP:
return new Path(getRandomReadDir(config), "part-00000");
}
return null;
}
private void analyzeResult(FileSystem fs, TestType testType, long execTime)
throws IOException {
analyzeResult(fs, testType, execTime, DEFAULT_RES_FILE_NAME);
}
private void cleanup(FileSystem fs) private void cleanup(FileSystem fs)
throws IOException { throws IOException {
LOG.info("Cleaning up test files"); LOG.info("Cleaning up test files");