MAPREDUCE-4651. Benchmarking random reads with DFSIO.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1390159 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d9c6b1ffe8
commit
33a3efcd18
|
@ -175,9 +175,6 @@ Release 2.0.2-alpha - 2012-09-07
|
|||
MAPREDUCE-4408. allow jobs to set a JAR that is in the distributed cached
|
||||
(rkanter via tucu)
|
||||
|
||||
MAPREDUCE-2786. Add compression option for TestDFSIO.
|
||||
(Plamen Jeliazkov via shv)
|
||||
|
||||
MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal
|
||||
interface to allow schedulers to maintain their own. (acmurthy)
|
||||
|
||||
|
@ -533,9 +530,14 @@ Release 0.23.4 - UNRELEASED
|
|||
|
||||
IMPROVEMENTS
|
||||
|
||||
MAPREDUCE-2786. Add compression option for TestDFSIO.
|
||||
(Plamen Jeliazkov via shv)
|
||||
|
||||
MAPREDUCE-4645. Provide a random seed to Slive to make the sequence
|
||||
of file names deterministic. (Ravi Prakash via shv)
|
||||
|
||||
MAPREDUCE-4651. Benchmarking random reads with DFSIO. (shv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -17,14 +17,13 @@
|
|||
*/
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.mapred.*;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* Base mapper class for IO operations.
|
||||
|
@ -35,7 +34,6 @@ import org.apache.hadoop.util.ReflectionUtils;
|
|||
* statistics data to be collected by subsequent reducers.
|
||||
*
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
public abstract class IOMapperBase<T> extends Configured
|
||||
implements Mapper<Text, LongWritable, Text, Text> {
|
||||
|
||||
|
@ -43,7 +41,7 @@ public abstract class IOMapperBase<T> extends Configured
|
|||
protected int bufferSize;
|
||||
protected FileSystem fs;
|
||||
protected String hostName;
|
||||
protected CompressionCodec compressionCodec;
|
||||
protected Closeable stream;
|
||||
|
||||
public IOMapperBase() {
|
||||
}
|
||||
|
@ -62,22 +60,6 @@ public abstract class IOMapperBase<T> extends Configured
|
|||
} catch(Exception e) {
|
||||
hostName = "localhost";
|
||||
}
|
||||
|
||||
//grab compression
|
||||
String compression = getConf().get("test.io.compression.class", null);
|
||||
Class<? extends CompressionCodec> codec;
|
||||
|
||||
//try to initialize codec
|
||||
try {
|
||||
codec = (compression == null) ? null :
|
||||
Class.forName(compression).asSubclass(CompressionCodec.class);
|
||||
} catch(Exception e) {
|
||||
throw new RuntimeException("Compression codec not found: ", e);
|
||||
}
|
||||
|
||||
if(codec != null) {
|
||||
compressionCodec = (CompressionCodec) ReflectionUtils.newInstance(codec, getConf());
|
||||
}
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
|
@ -97,6 +79,18 @@ public abstract class IOMapperBase<T> extends Configured
|
|||
String name,
|
||||
long value) throws IOException;
|
||||
|
||||
/**
|
||||
* Create an input or output stream based on the specified file.
|
||||
* Subclasses should override this method to provide an actual stream.
|
||||
*
|
||||
* @param name file name
|
||||
* @return the stream
|
||||
* @throws IOException
|
||||
*/
|
||||
public Closeable getIOStream(String name) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Collect stat data to be combined by a subsequent reducer.
|
||||
*
|
||||
|
@ -132,9 +126,15 @@ public abstract class IOMapperBase<T> extends Configured
|
|||
long longValue = value.get();
|
||||
|
||||
reporter.setStatus("starting " + name + " ::host = " + hostName);
|
||||
|
||||
|
||||
this.stream = getIOStream(name);
|
||||
T statValue = null;
|
||||
long tStart = System.currentTimeMillis();
|
||||
T statValue = doIO(reporter, name, longValue);
|
||||
try {
|
||||
statValue = doIO(reporter, name, longValue);
|
||||
} finally {
|
||||
if(stream != null) stream.close();
|
||||
}
|
||||
long tEnd = System.currentTimeMillis();
|
||||
long execTime = tEnd - tStart;
|
||||
collectStats(output, name, execTime, statValue);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.fs;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.Closeable;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
|
@ -28,10 +29,9 @@ import java.io.InputStreamReader;
|
|||
import java.io.OutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.util.Date;
|
||||
import java.util.Random;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -39,18 +39,30 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.SequenceFile.CompressionType;
|
||||
import org.apache.hadoop.mapred.*;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.mapred.FileInputFormat;
|
||||
import org.apache.hadoop.mapred.FileOutputFormat;
|
||||
import org.apache.hadoop.mapred.JobClient;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.Mapper;
|
||||
import org.apache.hadoop.mapred.OutputCollector;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.hadoop.mapred.SequenceFileInputFormat;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Distributed i/o benchmark.
|
||||
* <p>
|
||||
* This test writes into or reads from a specified number of files.
|
||||
* File size is specified as a parameter to the test.
|
||||
* Number of bytes to write or read is specified as a parameter to the test.
|
||||
* Each file is accessed in a separate map task.
|
||||
* <p>
|
||||
* The reducer collects the following statistics:
|
||||
|
@ -73,24 +85,24 @@ import org.apache.hadoop.util.ToolRunner;
|
|||
* <li>standard deviation of i/o rate </li>
|
||||
* </ul>
|
||||
*/
|
||||
public class TestDFSIO extends TestCase implements Tool {
|
||||
public class TestDFSIO implements Tool {
|
||||
// Constants
|
||||
private static final Log LOG = LogFactory.getLog(TestDFSIO.class);
|
||||
private static final int TEST_TYPE_READ = 0;
|
||||
private static final int TEST_TYPE_WRITE = 1;
|
||||
private static final int TEST_TYPE_CLEANUP = 2;
|
||||
private static final int TEST_TYPE_APPEND = 3;
|
||||
private static final int DEFAULT_BUFFER_SIZE = 1000000;
|
||||
private static final String BASE_FILE_NAME = "test_io_";
|
||||
private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
|
||||
private static final long MEGA = ByteMultiple.MB.value();
|
||||
private static final int DEFAULT_NR_BYTES = 1;
|
||||
private static final int DEFAULT_NR_FILES = 4;
|
||||
private static final String USAGE =
|
||||
"Usage: " + TestDFSIO.class.getSimpleName() +
|
||||
" [genericOptions]" +
|
||||
" -read | -write | -append | -clean [-nrFiles N]" +
|
||||
" [-fileSize Size[B|KB|MB|GB|TB]]" +
|
||||
" [-resFile resultFileName] [-bufferSize Bytes]" +
|
||||
" [-rootDir]";
|
||||
"Usage: " + TestDFSIO.class.getSimpleName() +
|
||||
" [genericOptions]" +
|
||||
" -read [-random | -backward | -skip [-skipSize Size]] |" +
|
||||
" -write | -append | -clean" +
|
||||
" [-nrFiles N]" +
|
||||
" [-size Size[B|KB|MB|GB|TB]]" +
|
||||
" [-resFile resultFileName] [-bufferSize Bytes]" +
|
||||
" [-rootDir]";
|
||||
|
||||
private Configuration config;
|
||||
|
||||
|
@ -101,6 +113,27 @@ public class TestDFSIO extends TestCase implements Tool {
|
|||
Configuration.addDefaultResource("mapred-site.xml");
|
||||
}
|
||||
|
||||
private static enum TestType {
|
||||
TEST_TYPE_READ("read"),
|
||||
TEST_TYPE_WRITE("write"),
|
||||
TEST_TYPE_CLEANUP("cleanup"),
|
||||
TEST_TYPE_APPEND("append"),
|
||||
TEST_TYPE_READ_RANDOM("random read"),
|
||||
TEST_TYPE_READ_BACKWARD("backward read"),
|
||||
TEST_TYPE_READ_SKIP("skip read");
|
||||
|
||||
private String type;
|
||||
|
||||
private TestType(String t) {
|
||||
type = t;
|
||||
}
|
||||
|
||||
@Override // String
|
||||
public String toString() {
|
||||
return type;
|
||||
}
|
||||
}
|
||||
|
||||
static enum ByteMultiple {
|
||||
B(1L),
|
||||
KB(0x400L),
|
||||
|
@ -155,62 +188,100 @@ public class TestDFSIO extends TestCase implements Tool {
|
|||
private static Path getAppendDir(Configuration conf) {
|
||||
return new Path(getBaseDir(conf), "io_append");
|
||||
}
|
||||
private static Path getRandomReadDir(Configuration conf) {
|
||||
return new Path(getBaseDir(conf), "io_random_read");
|
||||
}
|
||||
private static Path getDataDir(Configuration conf) {
|
||||
return new Path(getBaseDir(conf), "io_data");
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the test with default parameters.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testIOs() throws Exception {
|
||||
TestDFSIO bench = new TestDFSIO();
|
||||
bench.testIOs(1, 4);
|
||||
private static MiniDFSCluster cluster;
|
||||
private static TestDFSIO bench;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
bench = new TestDFSIO();
|
||||
bench.getConf().setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
|
||||
cluster = new MiniDFSCluster.Builder(bench.getConf())
|
||||
.numDataNodes(2)
|
||||
.format(true)
|
||||
.build();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
bench.createControlFile(fs, DEFAULT_NR_BYTES, DEFAULT_NR_FILES);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the test with the specified parameters.
|
||||
*
|
||||
* @param fileSize file size
|
||||
* @param nrFiles number of files
|
||||
* @throws IOException
|
||||
*/
|
||||
public void testIOs(int fileSize, int nrFiles)
|
||||
throws IOException {
|
||||
config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
cluster = new MiniDFSCluster(config, 2, true, null);
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
|
||||
createControlFile(fs, fileSize, nrFiles);
|
||||
long tStart = System.currentTimeMillis();
|
||||
writeTest(fs);
|
||||
long execTime = System.currentTimeMillis() - tStart;
|
||||
analyzeResult(fs, TEST_TYPE_WRITE, execTime, DEFAULT_RES_FILE_NAME);
|
||||
|
||||
tStart = System.currentTimeMillis();
|
||||
readTest(fs);
|
||||
execTime = System.currentTimeMillis() - tStart;
|
||||
analyzeResult(fs, TEST_TYPE_READ, execTime, DEFAULT_RES_FILE_NAME);
|
||||
|
||||
tStart = System.currentTimeMillis();
|
||||
appendTest(fs);
|
||||
execTime = System.currentTimeMillis() - tStart;
|
||||
analyzeResult(fs, TEST_TYPE_APPEND, execTime, DEFAULT_RES_FILE_NAME);
|
||||
|
||||
cleanup(fs);
|
||||
} finally {
|
||||
if(cluster != null) cluster.shutdown();
|
||||
}
|
||||
@AfterClass
|
||||
public static void afterClass() throws Exception {
|
||||
if(cluster == null)
|
||||
return;
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
bench.cleanup(fs);
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWrite() throws Exception {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
long tStart = System.currentTimeMillis();
|
||||
bench.writeTest(fs);
|
||||
long execTime = System.currentTimeMillis() - tStart;
|
||||
bench.analyzeResult(fs, TestType.TEST_TYPE_WRITE, execTime);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRead() throws Exception {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
long tStart = System.currentTimeMillis();
|
||||
bench.readTest(fs);
|
||||
long execTime = System.currentTimeMillis() - tStart;
|
||||
bench.analyzeResult(fs, TestType.TEST_TYPE_READ, execTime);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadRandom() throws Exception {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
long tStart = System.currentTimeMillis();
|
||||
bench.getConf().setLong("test.io.skip.size", 0);
|
||||
bench.randomReadTest(fs);
|
||||
long execTime = System.currentTimeMillis() - tStart;
|
||||
bench.analyzeResult(fs, TestType.TEST_TYPE_READ_RANDOM, execTime);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadBackward() throws Exception {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
long tStart = System.currentTimeMillis();
|
||||
bench.getConf().setLong("test.io.skip.size", -DEFAULT_BUFFER_SIZE);
|
||||
bench.randomReadTest(fs);
|
||||
long execTime = System.currentTimeMillis() - tStart;
|
||||
bench.analyzeResult(fs, TestType.TEST_TYPE_READ_BACKWARD, execTime);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadSkip() throws Exception {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
long tStart = System.currentTimeMillis();
|
||||
bench.getConf().setLong("test.io.skip.size", 1);
|
||||
bench.randomReadTest(fs);
|
||||
long execTime = System.currentTimeMillis() - tStart;
|
||||
bench.analyzeResult(fs, TestType.TEST_TYPE_READ_SKIP, execTime);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppend() throws Exception {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
long tStart = System.currentTimeMillis();
|
||||
bench.appendTest(fs);
|
||||
long execTime = System.currentTimeMillis() - tStart;
|
||||
bench.analyzeResult(fs, TestType.TEST_TYPE_APPEND, execTime);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private void createControlFile(FileSystem fs,
|
||||
long fileSize, // in bytes
|
||||
long nrBytes, // in bytes
|
||||
int nrFiles
|
||||
) throws IOException {
|
||||
LOG.info("creating control file: "+fileSize+" bytes, "+nrFiles+" files");
|
||||
LOG.info("creating control file: "+nrBytes+" bytes, "+nrFiles+" files");
|
||||
|
||||
Path controlDir = getControlDir(config);
|
||||
fs.delete(controlDir, true);
|
||||
|
@ -223,7 +294,7 @@ public class TestDFSIO extends TestCase implements Tool {
|
|||
writer = SequenceFile.createWriter(fs, config, controlFile,
|
||||
Text.class, LongWritable.class,
|
||||
CompressionType.NONE);
|
||||
writer.append(new Text(name), new LongWritable(fileSize));
|
||||
writer.append(new Text(name), new LongWritable(nrBytes));
|
||||
} catch(Exception e) {
|
||||
throw new IOException(e.getLocalizedMessage());
|
||||
} finally {
|
||||
|
@ -251,10 +322,35 @@ public class TestDFSIO extends TestCase implements Tool {
|
|||
* <li>i/o rate squared</li>
|
||||
* </ul>
|
||||
*/
|
||||
private abstract static class IOStatMapper<T> extends IOMapperBase<T> {
|
||||
IOStatMapper() {
|
||||
private abstract static class IOStatMapper extends IOMapperBase<Long> {
|
||||
protected CompressionCodec compressionCodec;
|
||||
|
||||
IOStatMapper() {
|
||||
}
|
||||
|
||||
|
||||
@Override // Mapper
|
||||
public void configure(JobConf conf) {
|
||||
super.configure(conf);
|
||||
|
||||
// grab compression
|
||||
String compression = getConf().get("test.io.compression.class", null);
|
||||
Class<? extends CompressionCodec> codec;
|
||||
|
||||
// try to initialize codec
|
||||
try {
|
||||
codec = (compression == null) ? null :
|
||||
Class.forName(compression).asSubclass(CompressionCodec.class);
|
||||
} catch(Exception e) {
|
||||
throw new RuntimeException("Compression codec not found: ", e);
|
||||
}
|
||||
|
||||
if(codec != null) {
|
||||
compressionCodec = (CompressionCodec)
|
||||
ReflectionUtils.newInstance(codec, getConf());
|
||||
}
|
||||
}
|
||||
|
||||
@Override // IOMapperBase
|
||||
void collectStats(OutputCollector<Text, Text> output,
|
||||
String name,
|
||||
long execTime,
|
||||
|
@ -281,36 +377,38 @@ public class TestDFSIO extends TestCase implements Tool {
|
|||
/**
|
||||
* Write mapper class.
|
||||
*/
|
||||
public static class WriteMapper extends IOStatMapper<Long> {
|
||||
public static class WriteMapper extends IOStatMapper {
|
||||
|
||||
public WriteMapper() {
|
||||
for(int i=0; i < bufferSize; i++)
|
||||
buffer[i] = (byte)('0' + i % 50);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Override // IOMapperBase
|
||||
public Closeable getIOStream(String name) throws IOException {
|
||||
// create file
|
||||
OutputStream out =
|
||||
fs.create(new Path(getDataDir(getConf()), name), true, bufferSize);
|
||||
if(compressionCodec != null)
|
||||
out = compressionCodec.createOutputStream(out);
|
||||
LOG.info("out = " + out.getClass().getName());
|
||||
return out;
|
||||
}
|
||||
|
||||
@Override // IOMapperBase
|
||||
public Long doIO(Reporter reporter,
|
||||
String name,
|
||||
long totalSize // in bytes
|
||||
) throws IOException {
|
||||
// create file
|
||||
OutputStream out;
|
||||
out = fs.create(new Path(getDataDir(getConf()), name), true, bufferSize);
|
||||
|
||||
if(compressionCodec != null) out = compressionCodec.createOutputStream(out);
|
||||
|
||||
try {
|
||||
// write to the file
|
||||
long nrRemaining;
|
||||
for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
|
||||
int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
|
||||
out.write(buffer, 0, curSize);
|
||||
reporter.setStatus("writing " + name + "@" +
|
||||
(totalSize - nrRemaining) + "/" + totalSize
|
||||
+ " ::host = " + hostName);
|
||||
}
|
||||
} finally {
|
||||
out.close();
|
||||
OutputStream out = (OutputStream)this.stream;
|
||||
// write to the file
|
||||
long nrRemaining;
|
||||
for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
|
||||
int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
|
||||
out.write(buffer, 0, curSize);
|
||||
reporter.setStatus("writing " + name + "@" +
|
||||
(totalSize - nrRemaining) + "/" + totalSize
|
||||
+ " ::host = " + hostName);
|
||||
}
|
||||
return Long.valueOf(totalSize);
|
||||
}
|
||||
|
@ -324,7 +422,6 @@ public class TestDFSIO extends TestCase implements Tool {
|
|||
runIOTest(WriteMapper.class, writeDir);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private void runIOTest(
|
||||
Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass,
|
||||
Path outputDir) throws IOException {
|
||||
|
@ -346,35 +443,38 @@ public class TestDFSIO extends TestCase implements Tool {
|
|||
/**
|
||||
* Append mapper class.
|
||||
*/
|
||||
public static class AppendMapper extends IOStatMapper<Long> {
|
||||
public static class AppendMapper extends IOStatMapper {
|
||||
|
||||
public AppendMapper() {
|
||||
for(int i=0; i < bufferSize; i++)
|
||||
buffer[i] = (byte)('0' + i % 50);
|
||||
}
|
||||
|
||||
@Override // IOMapperBase
|
||||
public Closeable getIOStream(String name) throws IOException {
|
||||
// open file for append
|
||||
OutputStream out =
|
||||
fs.append(new Path(getDataDir(getConf()), name), bufferSize);
|
||||
if(compressionCodec != null)
|
||||
out = compressionCodec.createOutputStream(out);
|
||||
LOG.info("out = " + out.getClass().getName());
|
||||
return out;
|
||||
}
|
||||
|
||||
@Override // IOMapperBase
|
||||
public Long doIO(Reporter reporter,
|
||||
String name,
|
||||
long totalSize // in bytes
|
||||
) throws IOException {
|
||||
// create file
|
||||
OutputStream out;
|
||||
out = fs.append(new Path(getDataDir(getConf()), name), bufferSize);
|
||||
|
||||
if(compressionCodec != null) out = compressionCodec.createOutputStream(out);
|
||||
|
||||
try {
|
||||
// write to the file
|
||||
long nrRemaining;
|
||||
for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
|
||||
int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
|
||||
out.write(buffer, 0, curSize);
|
||||
reporter.setStatus("writing " + name + "@" +
|
||||
(totalSize - nrRemaining) + "/" + totalSize
|
||||
+ " ::host = " + hostName);
|
||||
}
|
||||
} finally {
|
||||
out.close();
|
||||
OutputStream out = (OutputStream)this.stream;
|
||||
// write to the file
|
||||
long nrRemaining;
|
||||
for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
|
||||
int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
|
||||
out.write(buffer, 0, curSize);
|
||||
reporter.setStatus("writing " + name + "@" +
|
||||
(totalSize - nrRemaining) + "/" + totalSize
|
||||
+ " ::host = " + hostName);
|
||||
}
|
||||
return Long.valueOf(totalSize);
|
||||
}
|
||||
|
@ -389,32 +489,35 @@ public class TestDFSIO extends TestCase implements Tool {
|
|||
/**
|
||||
* Read mapper class.
|
||||
*/
|
||||
public static class ReadMapper extends IOStatMapper<Long> {
|
||||
public static class ReadMapper extends IOStatMapper {
|
||||
|
||||
public ReadMapper() {
|
||||
}
|
||||
|
||||
@Override // IOMapperBase
|
||||
public Closeable getIOStream(String name) throws IOException {
|
||||
// open file
|
||||
InputStream in = fs.open(new Path(getDataDir(getConf()), name));
|
||||
if(compressionCodec != null)
|
||||
in = compressionCodec.createInputStream(in);
|
||||
LOG.info("in = " + in.getClass().getName());
|
||||
return in;
|
||||
}
|
||||
|
||||
@Override // IOMapperBase
|
||||
public Long doIO(Reporter reporter,
|
||||
String name,
|
||||
long totalSize // in bytes
|
||||
) throws IOException {
|
||||
// open file
|
||||
InputStream in = fs.open(new Path(getDataDir(getConf()), name));
|
||||
|
||||
if(compressionCodec != null) in = compressionCodec.createInputStream(in);
|
||||
|
||||
InputStream in = (InputStream)this.stream;
|
||||
long actualSize = 0;
|
||||
try {
|
||||
while (actualSize < totalSize) {
|
||||
int curSize = in.read(buffer, 0, bufferSize);
|
||||
if(curSize < 0) break;
|
||||
actualSize += curSize;
|
||||
reporter.setStatus("reading " + name + "@" +
|
||||
actualSize + "/" + totalSize
|
||||
+ " ::host = " + hostName);
|
||||
}
|
||||
} finally {
|
||||
in.close();
|
||||
while (actualSize < totalSize) {
|
||||
int curSize = in.read(buffer, 0, bufferSize);
|
||||
if(curSize < 0) break;
|
||||
actualSize += curSize;
|
||||
reporter.setStatus("reading " + name + "@" +
|
||||
actualSize + "/" + totalSize
|
||||
+ " ::host = " + hostName);
|
||||
}
|
||||
return Long.valueOf(actualSize);
|
||||
}
|
||||
|
@ -426,20 +529,111 @@ public class TestDFSIO extends TestCase implements Tool {
|
|||
runIOTest(ReadMapper.class, readDir);
|
||||
}
|
||||
|
||||
/**
|
||||
* Mapper class for random reads.
|
||||
* The mapper chooses a position in the file and reads bufferSize
|
||||
* bytes starting at the chosen position.
|
||||
* It stops after reading the totalSize bytes, specified by -size.
|
||||
*
|
||||
* There are three type of reads.
|
||||
* 1) Random read always chooses a random position to read from: skipSize = 0
|
||||
* 2) Backward read reads file in reverse order : skipSize < 0
|
||||
* 3) Skip-read skips skipSize bytes after every read : skipSize > 0
|
||||
*/
|
||||
public static class RandomReadMapper extends IOStatMapper {
|
||||
private Random rnd;
|
||||
private long fileSize;
|
||||
private long skipSize;
|
||||
|
||||
@Override // Mapper
|
||||
public void configure(JobConf conf) {
|
||||
super.configure(conf);
|
||||
skipSize = conf.getLong("test.io.skip.size", 0);
|
||||
}
|
||||
|
||||
public RandomReadMapper() {
|
||||
rnd = new Random();
|
||||
}
|
||||
|
||||
@Override // IOMapperBase
|
||||
public Closeable getIOStream(String name) throws IOException {
|
||||
Path filePath = new Path(getDataDir(getConf()), name);
|
||||
this.fileSize = fs.getFileStatus(filePath).getLen();
|
||||
InputStream in = fs.open(filePath);
|
||||
if(compressionCodec != null)
|
||||
in = new FSDataInputStream(compressionCodec.createInputStream(in));
|
||||
LOG.info("in = " + in.getClass().getName());
|
||||
LOG.info("skipSize = " + skipSize);
|
||||
return in;
|
||||
}
|
||||
|
||||
@Override // IOMapperBase
|
||||
public Long doIO(Reporter reporter,
|
||||
String name,
|
||||
long totalSize // in bytes
|
||||
) throws IOException {
|
||||
PositionedReadable in = (PositionedReadable)this.stream;
|
||||
long actualSize = 0;
|
||||
for(long pos = nextOffset(-1);
|
||||
actualSize < totalSize; pos = nextOffset(pos)) {
|
||||
int curSize = in.read(pos, buffer, 0, bufferSize);
|
||||
if(curSize < 0) break;
|
||||
actualSize += curSize;
|
||||
reporter.setStatus("reading " + name + "@" +
|
||||
actualSize + "/" + totalSize
|
||||
+ " ::host = " + hostName);
|
||||
}
|
||||
return Long.valueOf(actualSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get next offset for reading.
|
||||
* If current < 0 then choose initial offset according to the read type.
|
||||
*
|
||||
* @param current offset
|
||||
* @return
|
||||
*/
|
||||
private long nextOffset(long current) {
|
||||
if(skipSize == 0)
|
||||
return rnd.nextInt((int)(fileSize));
|
||||
if(skipSize > 0)
|
||||
return (current < 0) ? 0 : (current + bufferSize + skipSize);
|
||||
// skipSize < 0
|
||||
return (current < 0) ? Math.max(0, fileSize - bufferSize) :
|
||||
Math.max(0, current + skipSize);
|
||||
}
|
||||
}
|
||||
|
||||
private void randomReadTest(FileSystem fs) throws IOException {
|
||||
Path readDir = getRandomReadDir(config);
|
||||
fs.delete(readDir, true);
|
||||
runIOTest(RandomReadMapper.class, readDir);
|
||||
}
|
||||
|
||||
private void sequentialTest(FileSystem fs,
|
||||
int testType,
|
||||
TestType testType,
|
||||
long fileSize, // in bytes
|
||||
int nrFiles
|
||||
) throws IOException {
|
||||
IOStatMapper<Long> ioer = null;
|
||||
if (testType == TEST_TYPE_READ)
|
||||
IOStatMapper ioer = null;
|
||||
switch(testType) {
|
||||
case TEST_TYPE_READ:
|
||||
ioer = new ReadMapper();
|
||||
else if (testType == TEST_TYPE_WRITE)
|
||||
break;
|
||||
case TEST_TYPE_WRITE:
|
||||
ioer = new WriteMapper();
|
||||
else if (testType == TEST_TYPE_APPEND)
|
||||
break;
|
||||
case TEST_TYPE_APPEND:
|
||||
ioer = new AppendMapper();
|
||||
else
|
||||
break;
|
||||
case TEST_TYPE_READ_RANDOM:
|
||||
case TEST_TYPE_READ_BACKWARD:
|
||||
case TEST_TYPE_READ_SKIP:
|
||||
ioer = new RandomReadMapper();
|
||||
break;
|
||||
default:
|
||||
return;
|
||||
}
|
||||
for(int i=0; i < nrFiles; i++)
|
||||
ioer.doIO(Reporter.NULL,
|
||||
BASE_FILE_NAME+Integer.toString(i),
|
||||
|
@ -462,14 +656,15 @@ public class TestDFSIO extends TestCase implements Tool {
|
|||
|
||||
@Override // Tool
|
||||
public int run(String[] args) throws IOException {
|
||||
int testType = TEST_TYPE_READ;
|
||||
TestType testType = null;
|
||||
int bufferSize = DEFAULT_BUFFER_SIZE;
|
||||
long fileSize = 1*MEGA;
|
||||
long nrBytes = 1*MEGA;
|
||||
int nrFiles = 1;
|
||||
long skipSize = 0;
|
||||
String resFileName = DEFAULT_RES_FILE_NAME;
|
||||
String compressionClass = null;
|
||||
boolean isSequential = false;
|
||||
String version = TestDFSIO.class.getSimpleName() + ".0.0.6";
|
||||
String version = TestDFSIO.class.getSimpleName() + ".1.7";
|
||||
|
||||
LOG.info(version);
|
||||
if (args.length == 0) {
|
||||
|
@ -479,21 +674,32 @@ public class TestDFSIO extends TestCase implements Tool {
|
|||
|
||||
for (int i = 0; i < args.length; i++) { // parse command line
|
||||
if (args[i].startsWith("-read")) {
|
||||
testType = TEST_TYPE_READ;
|
||||
testType = TestType.TEST_TYPE_READ;
|
||||
} else if (args[i].equals("-write")) {
|
||||
testType = TEST_TYPE_WRITE;
|
||||
testType = TestType.TEST_TYPE_WRITE;
|
||||
} else if (args[i].equals("-append")) {
|
||||
testType = TEST_TYPE_APPEND;
|
||||
testType = TestType.TEST_TYPE_APPEND;
|
||||
} else if (args[i].equals("-random")) {
|
||||
if(testType != TestType.TEST_TYPE_READ) return -1;
|
||||
testType = TestType.TEST_TYPE_READ_RANDOM;
|
||||
} else if (args[i].equals("-backward")) {
|
||||
if(testType != TestType.TEST_TYPE_READ) return -1;
|
||||
testType = TestType.TEST_TYPE_READ_BACKWARD;
|
||||
} else if (args[i].equals("-skip")) {
|
||||
if(testType != TestType.TEST_TYPE_READ) return -1;
|
||||
testType = TestType.TEST_TYPE_READ_SKIP;
|
||||
} else if (args[i].equals("-clean")) {
|
||||
testType = TEST_TYPE_CLEANUP;
|
||||
testType = TestType.TEST_TYPE_CLEANUP;
|
||||
} else if (args[i].startsWith("-seq")) {
|
||||
isSequential = true;
|
||||
} else if (args[i].startsWith("-compression")) {
|
||||
compressionClass = args[++i];
|
||||
} else if (args[i].equals("-nrFiles")) {
|
||||
nrFiles = Integer.parseInt(args[++i]);
|
||||
} else if (args[i].equals("-fileSize")) {
|
||||
fileSize = parseSize(args[++i]);
|
||||
} else if (args[i].equals("-fileSize") || args[i].equals("-size")) {
|
||||
nrBytes = parseSize(args[++i]);
|
||||
} else if (args[i].equals("-skipSize")) {
|
||||
skipSize = parseSize(args[++i]);
|
||||
} else if (args[i].equals("-bufferSize")) {
|
||||
bufferSize = Integer.parseInt(args[++i]);
|
||||
} else if (args[i].equals("-resFile")) {
|
||||
|
@ -503,10 +709,18 @@ public class TestDFSIO extends TestCase implements Tool {
|
|||
return -1;
|
||||
}
|
||||
}
|
||||
if(testType == null)
|
||||
return -1;
|
||||
if(testType == TestType.TEST_TYPE_READ_BACKWARD)
|
||||
skipSize = -bufferSize;
|
||||
else if(testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0)
|
||||
skipSize = bufferSize;
|
||||
|
||||
LOG.info("nrFiles = " + nrFiles);
|
||||
LOG.info("fileSize (MB) = " + toMB(fileSize));
|
||||
LOG.info("nrBytes (MB) = " + toMB(nrBytes));
|
||||
LOG.info("bufferSize = " + bufferSize);
|
||||
if(skipSize > 0)
|
||||
LOG.info("skipSize = " + skipSize);
|
||||
LOG.info("baseDir = " + getBaseDir(config));
|
||||
|
||||
if(compressionClass != null) {
|
||||
|
@ -515,29 +729,39 @@ public class TestDFSIO extends TestCase implements Tool {
|
|||
}
|
||||
|
||||
config.setInt("test.io.file.buffer.size", bufferSize);
|
||||
config.setLong("test.io.skip.size", skipSize);
|
||||
config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
|
||||
FileSystem fs = FileSystem.get(config);
|
||||
|
||||
if (isSequential) {
|
||||
long tStart = System.currentTimeMillis();
|
||||
sequentialTest(fs, testType, fileSize, nrFiles);
|
||||
sequentialTest(fs, testType, nrBytes, nrFiles);
|
||||
long execTime = System.currentTimeMillis() - tStart;
|
||||
String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
|
||||
LOG.info(resultLine);
|
||||
return 0;
|
||||
}
|
||||
if (testType == TEST_TYPE_CLEANUP) {
|
||||
if (testType == TestType.TEST_TYPE_CLEANUP) {
|
||||
cleanup(fs);
|
||||
return 0;
|
||||
}
|
||||
createControlFile(fs, fileSize, nrFiles);
|
||||
createControlFile(fs, nrBytes, nrFiles);
|
||||
long tStart = System.currentTimeMillis();
|
||||
if (testType == TEST_TYPE_WRITE)
|
||||
switch(testType) {
|
||||
case TEST_TYPE_WRITE:
|
||||
writeTest(fs);
|
||||
if (testType == TEST_TYPE_READ)
|
||||
break;
|
||||
case TEST_TYPE_READ:
|
||||
readTest(fs);
|
||||
if (testType == TEST_TYPE_APPEND)
|
||||
break;
|
||||
case TEST_TYPE_APPEND:
|
||||
appendTest(fs);
|
||||
break;
|
||||
case TEST_TYPE_READ_RANDOM:
|
||||
case TEST_TYPE_READ_BACKWARD:
|
||||
case TEST_TYPE_READ_SKIP:
|
||||
randomReadTest(fs);
|
||||
}
|
||||
long execTime = System.currentTimeMillis() - tStart;
|
||||
|
||||
analyzeResult(fs, testType, execTime, resFileName);
|
||||
|
@ -563,9 +787,9 @@ public class TestDFSIO extends TestCase implements Tool {
|
|||
static long parseSize(String arg) {
|
||||
String[] args = arg.split("\\D", 2); // get digits
|
||||
assert args.length <= 2;
|
||||
long fileSize = Long.parseLong(args[0]);
|
||||
long nrBytes = Long.parseLong(args[0]);
|
||||
String bytesMult = arg.substring(args[0].length()); // get byte multiple
|
||||
return fileSize * ByteMultiple.parseString(bytesMult).value();
|
||||
return nrBytes * ByteMultiple.parseString(bytesMult).value();
|
||||
}
|
||||
|
||||
static float toMB(long bytes) {
|
||||
|
@ -573,17 +797,11 @@ public class TestDFSIO extends TestCase implements Tool {
|
|||
}
|
||||
|
||||
private void analyzeResult( FileSystem fs,
|
||||
int testType,
|
||||
TestType testType,
|
||||
long execTime,
|
||||
String resFileName
|
||||
) throws IOException {
|
||||
Path reduceFile;
|
||||
if (testType == TEST_TYPE_WRITE)
|
||||
reduceFile = new Path(getWriteDir(config), "part-00000");
|
||||
else if (testType == TEST_TYPE_APPEND)
|
||||
reduceFile = new Path(getAppendDir(config), "part-00000");
|
||||
else // if (testType == TEST_TYPE_READ)
|
||||
reduceFile = new Path(getReadDir(config), "part-00000");
|
||||
Path reduceFile = getReduceFilePath(testType);
|
||||
long tasks = 0;
|
||||
long size = 0;
|
||||
long time = 0;
|
||||
|
@ -617,10 +835,7 @@ public class TestDFSIO extends TestCase implements Tool {
|
|||
double med = rate / 1000 / tasks;
|
||||
double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
|
||||
String resultLines[] = {
|
||||
"----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
|
||||
(testType == TEST_TYPE_READ) ? "read" :
|
||||
(testType == TEST_TYPE_APPEND) ? "append" :
|
||||
"unknown"),
|
||||
"----- TestDFSIO ----- : " + testType,
|
||||
" Date & time: " + new Date(System.currentTimeMillis()),
|
||||
" Number of files: " + tasks,
|
||||
"Total MBytes processed: " + toMB(size),
|
||||
|
@ -642,6 +857,27 @@ public class TestDFSIO extends TestCase implements Tool {
|
|||
}
|
||||
}
|
||||
|
||||
private Path getReduceFilePath(TestType testType) {
|
||||
switch(testType) {
|
||||
case TEST_TYPE_WRITE:
|
||||
return new Path(getWriteDir(config), "part-00000");
|
||||
case TEST_TYPE_APPEND:
|
||||
return new Path(getAppendDir(config), "part-00000");
|
||||
case TEST_TYPE_READ:
|
||||
return new Path(getReadDir(config), "part-00000");
|
||||
case TEST_TYPE_READ_RANDOM:
|
||||
case TEST_TYPE_READ_BACKWARD:
|
||||
case TEST_TYPE_READ_SKIP:
|
||||
return new Path(getRandomReadDir(config), "part-00000");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void analyzeResult(FileSystem fs, TestType testType, long execTime)
|
||||
throws IOException {
|
||||
analyzeResult(fs, testType, execTime, DEFAULT_RES_FILE_NAME);
|
||||
}
|
||||
|
||||
private void cleanup(FileSystem fs)
|
||||
throws IOException {
|
||||
LOG.info("Cleaning up test files");
|
||||
|
|
Loading…
Reference in New Issue