From 1aba4503cfc8aaa67c42179f7a69483deb3cf8d9 Mon Sep 17 00:00:00 2001 From: Konstantin Shvachko Date: Tue, 25 Sep 2012 21:36:30 +0000 Subject: [PATCH] MAPREDUCE-4651. Benchmarking random reads with DFSIO. Contributed by Konstantin Shvachko. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1390161 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 8 +- .../org/apache/hadoop/fs/IOMapperBase.java | 44 +- .../java/org/apache/hadoop/fs/TestDFSIO.java | 552 +++++++++++++----- 3 files changed, 421 insertions(+), 183 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 3951ce63efa..cfb6cc51d44 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -49,9 +49,6 @@ Release 2.0.2-alpha - 2012-09-07 MAPREDUCE-4408. allow jobs to set a JAR that is in the distributed cached (rkanter via tucu) - MAPREDUCE-2786. Add compression option for TestDFSIO. - (Plamen Jeliazkov via shv) - MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal interface to allow schedulers to maintain their own. (acmurthy) @@ -409,9 +406,14 @@ Release 0.23.4 - UNRELEASED IMPROVEMENTS + MAPREDUCE-2786. Add compression option for TestDFSIO. + (Plamen Jeliazkov via shv) + MAPREDUCE-4645. Provide a random seed to Slive to make the sequence of file names deterministic. (Ravi Prakash via shv) + MAPREDUCE-4651. Benchmarking random reads with DFSIO. (shv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java index fe1af6afcd4..ddd2d2f1269 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java @@ -17,14 +17,13 @@ */ package org.apache.hadoop.fs; +import java.io.Closeable; import java.io.IOException; import java.net.InetAddress; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapred.*; -import org.apache.hadoop.util.ReflectionUtils; /** * Base mapper class for IO operations. @@ -35,7 +34,6 @@ import org.apache.hadoop.util.ReflectionUtils; * statistics data to be collected by subsequent reducers. * */ -@SuppressWarnings("deprecation") public abstract class IOMapperBase extends Configured implements Mapper { @@ -43,7 +41,7 @@ public abstract class IOMapperBase extends Configured protected int bufferSize; protected FileSystem fs; protected String hostName; - protected CompressionCodec compressionCodec; + protected Closeable stream; public IOMapperBase() { } @@ -62,22 +60,6 @@ public abstract class IOMapperBase extends Configured } catch(Exception e) { hostName = "localhost"; } - - //grab compression - String compression = getConf().get("test.io.compression.class", null); - Class codec; - - //try to initialize codec - try { - codec = (compression == null) ? null : - Class.forName(compression).asSubclass(CompressionCodec.class); - } catch(Exception e) { - throw new RuntimeException("Compression codec not found: ", e); - } - - if(codec != null) { - compressionCodec = (CompressionCodec) ReflectionUtils.newInstance(codec, getConf()); - } } public void close() throws IOException { @@ -97,6 +79,18 @@ public abstract class IOMapperBase extends Configured String name, long value) throws IOException; + /** + * Create an input or output stream based on the specified file. + * Subclasses should override this method to provide an actual stream. + * + * @param name file name + * @return the stream + * @throws IOException + */ + public Closeable getIOStream(String name) throws IOException { + return null; + } + /** * Collect stat data to be combined by a subsequent reducer. * @@ -132,9 +126,15 @@ public abstract class IOMapperBase extends Configured long longValue = value.get(); reporter.setStatus("starting " + name + " ::host = " + hostName); - + + this.stream = getIOStream(name); + T statValue = null; long tStart = System.currentTimeMillis(); - T statValue = doIO(reporter, name, longValue); + try { + statValue = doIO(reporter, name, longValue); + } finally { + if(stream != null) stream.close(); + } long tEnd = System.currentTimeMillis(); long execTime = tEnd - tStart; collectStats(output, name, execTime, statValue); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java index 0d589ff59fa..e903c8dc807 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs; import java.io.BufferedReader; +import java.io.Closeable; import java.io.DataInputStream; import java.io.File; import java.io.FileOutputStream; @@ -28,10 +29,9 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintStream; import java.util.Date; +import java.util.Random; import java.util.StringTokenizer; -import junit.framework.TestCase; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -39,18 +39,30 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.mapred.*; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; /** * Distributed i/o benchmark. *

* This test writes into or reads from a specified number of files. - * File size is specified as a parameter to the test. + * Number of bytes to write or read is specified as a parameter to the test. * Each file is accessed in a separate map task. *

* The reducer collects the following statistics: @@ -73,24 +85,24 @@ import org.apache.hadoop.util.ToolRunner; *

  • standard deviation of i/o rate
  • * */ -public class TestDFSIO extends TestCase implements Tool { +public class TestDFSIO implements Tool { // Constants private static final Log LOG = LogFactory.getLog(TestDFSIO.class); - private static final int TEST_TYPE_READ = 0; - private static final int TEST_TYPE_WRITE = 1; - private static final int TEST_TYPE_CLEANUP = 2; - private static final int TEST_TYPE_APPEND = 3; private static final int DEFAULT_BUFFER_SIZE = 1000000; private static final String BASE_FILE_NAME = "test_io_"; private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log"; private static final long MEGA = ByteMultiple.MB.value(); + private static final int DEFAULT_NR_BYTES = 1; + private static final int DEFAULT_NR_FILES = 4; private static final String USAGE = - "Usage: " + TestDFSIO.class.getSimpleName() + - " [genericOptions]" + - " -read | -write | -append | -clean [-nrFiles N]" + - " [-fileSize Size[B|KB|MB|GB|TB]]" + - " [-resFile resultFileName] [-bufferSize Bytes]" + - " [-rootDir]"; + "Usage: " + TestDFSIO.class.getSimpleName() + + " [genericOptions]" + + " -read [-random | -backward | -skip [-skipSize Size]] |" + + " -write | -append | -clean" + + " [-nrFiles N]" + + " [-size Size[B|KB|MB|GB|TB]]" + + " [-resFile resultFileName] [-bufferSize Bytes]" + + " [-rootDir]"; private Configuration config; @@ -101,6 +113,27 @@ public class TestDFSIO extends TestCase implements Tool { Configuration.addDefaultResource("mapred-site.xml"); } + private static enum TestType { + TEST_TYPE_READ("read"), + TEST_TYPE_WRITE("write"), + TEST_TYPE_CLEANUP("cleanup"), + TEST_TYPE_APPEND("append"), + TEST_TYPE_READ_RANDOM("random read"), + TEST_TYPE_READ_BACKWARD("backward read"), + TEST_TYPE_READ_SKIP("skip read"); + + private String type; + + private TestType(String t) { + type = t; + } + + @Override // String + public String toString() { + return type; + } + } + static enum ByteMultiple { B(1L), KB(0x400L), @@ -155,62 +188,100 @@ public class TestDFSIO extends TestCase implements Tool { private static Path getAppendDir(Configuration conf) { return new Path(getBaseDir(conf), "io_append"); } + private static Path getRandomReadDir(Configuration conf) { + return new Path(getBaseDir(conf), "io_random_read"); + } private static Path getDataDir(Configuration conf) { return new Path(getBaseDir(conf), "io_data"); } - /** - * Run the test with default parameters. - * - * @throws Exception - */ - public void testIOs() throws Exception { - TestDFSIO bench = new TestDFSIO(); - bench.testIOs(1, 4); + private static MiniDFSCluster cluster; + private static TestDFSIO bench; + + @BeforeClass + public static void beforeClass() throws Exception { + bench = new TestDFSIO(); + bench.getConf().setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true); + cluster = new MiniDFSCluster.Builder(bench.getConf()) + .numDataNodes(2) + .format(true) + .build(); + FileSystem fs = cluster.getFileSystem(); + bench.createControlFile(fs, DEFAULT_NR_BYTES, DEFAULT_NR_FILES); } - /** - * Run the test with the specified parameters. - * - * @param fileSize file size - * @param nrFiles number of files - * @throws IOException - */ - public void testIOs(int fileSize, int nrFiles) - throws IOException { - config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true); - MiniDFSCluster cluster = null; - try { - cluster = new MiniDFSCluster(config, 2, true, null); - FileSystem fs = cluster.getFileSystem(); - - createControlFile(fs, fileSize, nrFiles); - long tStart = System.currentTimeMillis(); - writeTest(fs); - long execTime = System.currentTimeMillis() - tStart; - analyzeResult(fs, TEST_TYPE_WRITE, execTime, DEFAULT_RES_FILE_NAME); - - tStart = System.currentTimeMillis(); - readTest(fs); - execTime = System.currentTimeMillis() - tStart; - analyzeResult(fs, TEST_TYPE_READ, execTime, DEFAULT_RES_FILE_NAME); - - tStart = System.currentTimeMillis(); - appendTest(fs); - execTime = System.currentTimeMillis() - tStart; - analyzeResult(fs, TEST_TYPE_APPEND, execTime, DEFAULT_RES_FILE_NAME); - - cleanup(fs); - } finally { - if(cluster != null) cluster.shutdown(); - } + @AfterClass + public static void afterClass() throws Exception { + if(cluster == null) + return; + FileSystem fs = cluster.getFileSystem(); + bench.cleanup(fs); + cluster.shutdown(); } + @Test + public void testWrite() throws Exception { + FileSystem fs = cluster.getFileSystem(); + long tStart = System.currentTimeMillis(); + bench.writeTest(fs); + long execTime = System.currentTimeMillis() - tStart; + bench.analyzeResult(fs, TestType.TEST_TYPE_WRITE, execTime); + } + + @Test + public void testRead() throws Exception { + FileSystem fs = cluster.getFileSystem(); + long tStart = System.currentTimeMillis(); + bench.readTest(fs); + long execTime = System.currentTimeMillis() - tStart; + bench.analyzeResult(fs, TestType.TEST_TYPE_READ, execTime); + } + + @Test + public void testReadRandom() throws Exception { + FileSystem fs = cluster.getFileSystem(); + long tStart = System.currentTimeMillis(); + bench.getConf().setLong("test.io.skip.size", 0); + bench.randomReadTest(fs); + long execTime = System.currentTimeMillis() - tStart; + bench.analyzeResult(fs, TestType.TEST_TYPE_READ_RANDOM, execTime); + } + + @Test + public void testReadBackward() throws Exception { + FileSystem fs = cluster.getFileSystem(); + long tStart = System.currentTimeMillis(); + bench.getConf().setLong("test.io.skip.size", -DEFAULT_BUFFER_SIZE); + bench.randomReadTest(fs); + long execTime = System.currentTimeMillis() - tStart; + bench.analyzeResult(fs, TestType.TEST_TYPE_READ_BACKWARD, execTime); + } + + @Test + public void testReadSkip() throws Exception { + FileSystem fs = cluster.getFileSystem(); + long tStart = System.currentTimeMillis(); + bench.getConf().setLong("test.io.skip.size", 1); + bench.randomReadTest(fs); + long execTime = System.currentTimeMillis() - tStart; + bench.analyzeResult(fs, TestType.TEST_TYPE_READ_SKIP, execTime); + } + + @Test + public void testAppend() throws Exception { + FileSystem fs = cluster.getFileSystem(); + long tStart = System.currentTimeMillis(); + bench.appendTest(fs); + long execTime = System.currentTimeMillis() - tStart; + bench.analyzeResult(fs, TestType.TEST_TYPE_APPEND, execTime); + } + + @SuppressWarnings("deprecation") private void createControlFile(FileSystem fs, - long fileSize, // in bytes + long nrBytes, // in bytes int nrFiles ) throws IOException { - LOG.info("creating control file: "+fileSize+" bytes, "+nrFiles+" files"); + LOG.info("creating control file: "+nrBytes+" bytes, "+nrFiles+" files"); Path controlDir = getControlDir(config); fs.delete(controlDir, true); @@ -223,7 +294,7 @@ public class TestDFSIO extends TestCase implements Tool { writer = SequenceFile.createWriter(fs, config, controlFile, Text.class, LongWritable.class, CompressionType.NONE); - writer.append(new Text(name), new LongWritable(fileSize)); + writer.append(new Text(name), new LongWritable(nrBytes)); } catch(Exception e) { throw new IOException(e.getLocalizedMessage()); } finally { @@ -251,10 +322,35 @@ public class TestDFSIO extends TestCase implements Tool { *
  • i/o rate squared
  • * */ - private abstract static class IOStatMapper extends IOMapperBase { - IOStatMapper() { + private abstract static class IOStatMapper extends IOMapperBase { + protected CompressionCodec compressionCodec; + + IOStatMapper() { } - + + @Override // Mapper + public void configure(JobConf conf) { + super.configure(conf); + + // grab compression + String compression = getConf().get("test.io.compression.class", null); + Class codec; + + // try to initialize codec + try { + codec = (compression == null) ? null : + Class.forName(compression).asSubclass(CompressionCodec.class); + } catch(Exception e) { + throw new RuntimeException("Compression codec not found: ", e); + } + + if(codec != null) { + compressionCodec = (CompressionCodec) + ReflectionUtils.newInstance(codec, getConf()); + } + } + + @Override // IOMapperBase void collectStats(OutputCollector output, String name, long execTime, @@ -281,36 +377,38 @@ public class TestDFSIO extends TestCase implements Tool { /** * Write mapper class. */ - public static class WriteMapper extends IOStatMapper { + public static class WriteMapper extends IOStatMapper { public WriteMapper() { for(int i=0; i < bufferSize; i++) buffer[i] = (byte)('0' + i % 50); } - @Override + @Override // IOMapperBase + public Closeable getIOStream(String name) throws IOException { + // create file + OutputStream out = + fs.create(new Path(getDataDir(getConf()), name), true, bufferSize); + if(compressionCodec != null) + out = compressionCodec.createOutputStream(out); + LOG.info("out = " + out.getClass().getName()); + return out; + } + + @Override // IOMapperBase public Long doIO(Reporter reporter, String name, long totalSize // in bytes ) throws IOException { - // create file - OutputStream out; - out = fs.create(new Path(getDataDir(getConf()), name), true, bufferSize); - - if(compressionCodec != null) out = compressionCodec.createOutputStream(out); - - try { - // write to the file - long nrRemaining; - for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) { - int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; - out.write(buffer, 0, curSize); - reporter.setStatus("writing " + name + "@" + - (totalSize - nrRemaining) + "/" + totalSize - + " ::host = " + hostName); - } - } finally { - out.close(); + OutputStream out = (OutputStream)this.stream; + // write to the file + long nrRemaining; + for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) { + int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; + out.write(buffer, 0, curSize); + reporter.setStatus("writing " + name + "@" + + (totalSize - nrRemaining) + "/" + totalSize + + " ::host = " + hostName); } return Long.valueOf(totalSize); } @@ -324,7 +422,6 @@ public class TestDFSIO extends TestCase implements Tool { runIOTest(WriteMapper.class, writeDir); } - @SuppressWarnings("deprecation") private void runIOTest( Class> mapperClass, Path outputDir) throws IOException { @@ -346,35 +443,38 @@ public class TestDFSIO extends TestCase implements Tool { /** * Append mapper class. */ - public static class AppendMapper extends IOStatMapper { + public static class AppendMapper extends IOStatMapper { public AppendMapper() { for(int i=0; i < bufferSize; i++) buffer[i] = (byte)('0' + i % 50); } + @Override // IOMapperBase + public Closeable getIOStream(String name) throws IOException { + // open file for append + OutputStream out = + fs.append(new Path(getDataDir(getConf()), name), bufferSize); + if(compressionCodec != null) + out = compressionCodec.createOutputStream(out); + LOG.info("out = " + out.getClass().getName()); + return out; + } + + @Override // IOMapperBase public Long doIO(Reporter reporter, String name, long totalSize // in bytes ) throws IOException { - // create file - OutputStream out; - out = fs.append(new Path(getDataDir(getConf()), name), bufferSize); - - if(compressionCodec != null) out = compressionCodec.createOutputStream(out); - - try { - // write to the file - long nrRemaining; - for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) { - int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; - out.write(buffer, 0, curSize); - reporter.setStatus("writing " + name + "@" + - (totalSize - nrRemaining) + "/" + totalSize - + " ::host = " + hostName); - } - } finally { - out.close(); + OutputStream out = (OutputStream)this.stream; + // write to the file + long nrRemaining; + for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) { + int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; + out.write(buffer, 0, curSize); + reporter.setStatus("writing " + name + "@" + + (totalSize - nrRemaining) + "/" + totalSize + + " ::host = " + hostName); } return Long.valueOf(totalSize); } @@ -389,32 +489,35 @@ public class TestDFSIO extends TestCase implements Tool { /** * Read mapper class. */ - public static class ReadMapper extends IOStatMapper { + public static class ReadMapper extends IOStatMapper { public ReadMapper() { } + @Override // IOMapperBase + public Closeable getIOStream(String name) throws IOException { + // open file + InputStream in = fs.open(new Path(getDataDir(getConf()), name)); + if(compressionCodec != null) + in = compressionCodec.createInputStream(in); + LOG.info("in = " + in.getClass().getName()); + return in; + } + + @Override // IOMapperBase public Long doIO(Reporter reporter, String name, long totalSize // in bytes ) throws IOException { - // open file - InputStream in = fs.open(new Path(getDataDir(getConf()), name)); - - if(compressionCodec != null) in = compressionCodec.createInputStream(in); - + InputStream in = (InputStream)this.stream; long actualSize = 0; - try { - while (actualSize < totalSize) { - int curSize = in.read(buffer, 0, bufferSize); - if(curSize < 0) break; - actualSize += curSize; - reporter.setStatus("reading " + name + "@" + - actualSize + "/" + totalSize - + " ::host = " + hostName); - } - } finally { - in.close(); + while (actualSize < totalSize) { + int curSize = in.read(buffer, 0, bufferSize); + if(curSize < 0) break; + actualSize += curSize; + reporter.setStatus("reading " + name + "@" + + actualSize + "/" + totalSize + + " ::host = " + hostName); } return Long.valueOf(actualSize); } @@ -426,20 +529,111 @@ public class TestDFSIO extends TestCase implements Tool { runIOTest(ReadMapper.class, readDir); } + /** + * Mapper class for random reads. + * The mapper chooses a position in the file and reads bufferSize + * bytes starting at the chosen position. + * It stops after reading the totalSize bytes, specified by -size. + * + * There are three type of reads. + * 1) Random read always chooses a random position to read from: skipSize = 0 + * 2) Backward read reads file in reverse order : skipSize < 0 + * 3) Skip-read skips skipSize bytes after every read : skipSize > 0 + */ + public static class RandomReadMapper extends IOStatMapper { + private Random rnd; + private long fileSize; + private long skipSize; + + @Override // Mapper + public void configure(JobConf conf) { + super.configure(conf); + skipSize = conf.getLong("test.io.skip.size", 0); + } + + public RandomReadMapper() { + rnd = new Random(); + } + + @Override // IOMapperBase + public Closeable getIOStream(String name) throws IOException { + Path filePath = new Path(getDataDir(getConf()), name); + this.fileSize = fs.getFileStatus(filePath).getLen(); + InputStream in = fs.open(filePath); + if(compressionCodec != null) + in = new FSDataInputStream(compressionCodec.createInputStream(in)); + LOG.info("in = " + in.getClass().getName()); + LOG.info("skipSize = " + skipSize); + return in; + } + + @Override // IOMapperBase + public Long doIO(Reporter reporter, + String name, + long totalSize // in bytes + ) throws IOException { + PositionedReadable in = (PositionedReadable)this.stream; + long actualSize = 0; + for(long pos = nextOffset(-1); + actualSize < totalSize; pos = nextOffset(pos)) { + int curSize = in.read(pos, buffer, 0, bufferSize); + if(curSize < 0) break; + actualSize += curSize; + reporter.setStatus("reading " + name + "@" + + actualSize + "/" + totalSize + + " ::host = " + hostName); + } + return Long.valueOf(actualSize); + } + + /** + * Get next offset for reading. + * If current < 0 then choose initial offset according to the read type. + * + * @param current offset + * @return + */ + private long nextOffset(long current) { + if(skipSize == 0) + return rnd.nextInt((int)(fileSize)); + if(skipSize > 0) + return (current < 0) ? 0 : (current + bufferSize + skipSize); + // skipSize < 0 + return (current < 0) ? Math.max(0, fileSize - bufferSize) : + Math.max(0, current + skipSize); + } + } + + private void randomReadTest(FileSystem fs) throws IOException { + Path readDir = getRandomReadDir(config); + fs.delete(readDir, true); + runIOTest(RandomReadMapper.class, readDir); + } + private void sequentialTest(FileSystem fs, - int testType, + TestType testType, long fileSize, // in bytes int nrFiles ) throws IOException { - IOStatMapper ioer = null; - if (testType == TEST_TYPE_READ) + IOStatMapper ioer = null; + switch(testType) { + case TEST_TYPE_READ: ioer = new ReadMapper(); - else if (testType == TEST_TYPE_WRITE) + break; + case TEST_TYPE_WRITE: ioer = new WriteMapper(); - else if (testType == TEST_TYPE_APPEND) + break; + case TEST_TYPE_APPEND: ioer = new AppendMapper(); - else + break; + case TEST_TYPE_READ_RANDOM: + case TEST_TYPE_READ_BACKWARD: + case TEST_TYPE_READ_SKIP: + ioer = new RandomReadMapper(); + break; + default: return; + } for(int i=0; i < nrFiles; i++) ioer.doIO(Reporter.NULL, BASE_FILE_NAME+Integer.toString(i), @@ -462,14 +656,15 @@ public class TestDFSIO extends TestCase implements Tool { @Override // Tool public int run(String[] args) throws IOException { - int testType = TEST_TYPE_READ; + TestType testType = null; int bufferSize = DEFAULT_BUFFER_SIZE; - long fileSize = 1*MEGA; + long nrBytes = 1*MEGA; int nrFiles = 1; + long skipSize = 0; String resFileName = DEFAULT_RES_FILE_NAME; String compressionClass = null; boolean isSequential = false; - String version = TestDFSIO.class.getSimpleName() + ".0.0.6"; + String version = TestDFSIO.class.getSimpleName() + ".1.7"; LOG.info(version); if (args.length == 0) { @@ -479,21 +674,32 @@ public class TestDFSIO extends TestCase implements Tool { for (int i = 0; i < args.length; i++) { // parse command line if (args[i].startsWith("-read")) { - testType = TEST_TYPE_READ; + testType = TestType.TEST_TYPE_READ; } else if (args[i].equals("-write")) { - testType = TEST_TYPE_WRITE; + testType = TestType.TEST_TYPE_WRITE; } else if (args[i].equals("-append")) { - testType = TEST_TYPE_APPEND; + testType = TestType.TEST_TYPE_APPEND; + } else if (args[i].equals("-random")) { + if(testType != TestType.TEST_TYPE_READ) return -1; + testType = TestType.TEST_TYPE_READ_RANDOM; + } else if (args[i].equals("-backward")) { + if(testType != TestType.TEST_TYPE_READ) return -1; + testType = TestType.TEST_TYPE_READ_BACKWARD; + } else if (args[i].equals("-skip")) { + if(testType != TestType.TEST_TYPE_READ) return -1; + testType = TestType.TEST_TYPE_READ_SKIP; } else if (args[i].equals("-clean")) { - testType = TEST_TYPE_CLEANUP; + testType = TestType.TEST_TYPE_CLEANUP; } else if (args[i].startsWith("-seq")) { isSequential = true; } else if (args[i].startsWith("-compression")) { compressionClass = args[++i]; } else if (args[i].equals("-nrFiles")) { nrFiles = Integer.parseInt(args[++i]); - } else if (args[i].equals("-fileSize")) { - fileSize = parseSize(args[++i]); + } else if (args[i].equals("-fileSize") || args[i].equals("-size")) { + nrBytes = parseSize(args[++i]); + } else if (args[i].equals("-skipSize")) { + skipSize = parseSize(args[++i]); } else if (args[i].equals("-bufferSize")) { bufferSize = Integer.parseInt(args[++i]); } else if (args[i].equals("-resFile")) { @@ -503,10 +709,18 @@ public class TestDFSIO extends TestCase implements Tool { return -1; } } + if(testType == null) + return -1; + if(testType == TestType.TEST_TYPE_READ_BACKWARD) + skipSize = -bufferSize; + else if(testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0) + skipSize = bufferSize; LOG.info("nrFiles = " + nrFiles); - LOG.info("fileSize (MB) = " + toMB(fileSize)); + LOG.info("nrBytes (MB) = " + toMB(nrBytes)); LOG.info("bufferSize = " + bufferSize); + if(skipSize > 0) + LOG.info("skipSize = " + skipSize); LOG.info("baseDir = " + getBaseDir(config)); if(compressionClass != null) { @@ -515,29 +729,39 @@ public class TestDFSIO extends TestCase implements Tool { } config.setInt("test.io.file.buffer.size", bufferSize); + config.setLong("test.io.skip.size", skipSize); config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true); FileSystem fs = FileSystem.get(config); if (isSequential) { long tStart = System.currentTimeMillis(); - sequentialTest(fs, testType, fileSize, nrFiles); + sequentialTest(fs, testType, nrBytes, nrFiles); long execTime = System.currentTimeMillis() - tStart; String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000; LOG.info(resultLine); return 0; } - if (testType == TEST_TYPE_CLEANUP) { + if (testType == TestType.TEST_TYPE_CLEANUP) { cleanup(fs); return 0; } - createControlFile(fs, fileSize, nrFiles); + createControlFile(fs, nrBytes, nrFiles); long tStart = System.currentTimeMillis(); - if (testType == TEST_TYPE_WRITE) + switch(testType) { + case TEST_TYPE_WRITE: writeTest(fs); - if (testType == TEST_TYPE_READ) + break; + case TEST_TYPE_READ: readTest(fs); - if (testType == TEST_TYPE_APPEND) + break; + case TEST_TYPE_APPEND: appendTest(fs); + break; + case TEST_TYPE_READ_RANDOM: + case TEST_TYPE_READ_BACKWARD: + case TEST_TYPE_READ_SKIP: + randomReadTest(fs); + } long execTime = System.currentTimeMillis() - tStart; analyzeResult(fs, testType, execTime, resFileName); @@ -563,9 +787,9 @@ public class TestDFSIO extends TestCase implements Tool { static long parseSize(String arg) { String[] args = arg.split("\\D", 2); // get digits assert args.length <= 2; - long fileSize = Long.parseLong(args[0]); + long nrBytes = Long.parseLong(args[0]); String bytesMult = arg.substring(args[0].length()); // get byte multiple - return fileSize * ByteMultiple.parseString(bytesMult).value(); + return nrBytes * ByteMultiple.parseString(bytesMult).value(); } static float toMB(long bytes) { @@ -573,17 +797,11 @@ public class TestDFSIO extends TestCase implements Tool { } private void analyzeResult( FileSystem fs, - int testType, + TestType testType, long execTime, String resFileName ) throws IOException { - Path reduceFile; - if (testType == TEST_TYPE_WRITE) - reduceFile = new Path(getWriteDir(config), "part-00000"); - else if (testType == TEST_TYPE_APPEND) - reduceFile = new Path(getAppendDir(config), "part-00000"); - else // if (testType == TEST_TYPE_READ) - reduceFile = new Path(getReadDir(config), "part-00000"); + Path reduceFile = getReduceFilePath(testType); long tasks = 0; long size = 0; long time = 0; @@ -617,10 +835,7 @@ public class TestDFSIO extends TestCase implements Tool { double med = rate / 1000 / tasks; double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med)); String resultLines[] = { - "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" : - (testType == TEST_TYPE_READ) ? "read" : - (testType == TEST_TYPE_APPEND) ? "append" : - "unknown"), + "----- TestDFSIO ----- : " + testType, " Date & time: " + new Date(System.currentTimeMillis()), " Number of files: " + tasks, "Total MBytes processed: " + toMB(size), @@ -642,6 +857,27 @@ public class TestDFSIO extends TestCase implements Tool { } } + private Path getReduceFilePath(TestType testType) { + switch(testType) { + case TEST_TYPE_WRITE: + return new Path(getWriteDir(config), "part-00000"); + case TEST_TYPE_APPEND: + return new Path(getAppendDir(config), "part-00000"); + case TEST_TYPE_READ: + return new Path(getReadDir(config), "part-00000"); + case TEST_TYPE_READ_RANDOM: + case TEST_TYPE_READ_BACKWARD: + case TEST_TYPE_READ_SKIP: + return new Path(getRandomReadDir(config), "part-00000"); + } + return null; + } + + private void analyzeResult(FileSystem fs, TestType testType, long execTime) + throws IOException { + analyzeResult(fs, testType, execTime, DEFAULT_RES_FILE_NAME); + } + private void cleanup(FileSystem fs) throws IOException { LOG.info("Cleaning up test files");