diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index b8e46f4bea4..91f9be7e733 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -42,6 +42,8 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-5800. Use Job#getInstance instead of deprecated constructors (aajisaka) + MAPREDUCE-6227. DFSIO for truncate. (shv via yliu) + OPTIMIZATIONS MAPREDUCE-6169. MergeQueue should release reference to the current item diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java index 2384ff1c25e..f85a2ee0d89 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java @@ -31,7 +31,6 @@ import java.io.PrintStream; import java.util.Date; import java.util.Random; import java.util.StringTokenizer; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -92,13 +91,13 @@ public class TestDFSIO implements Tool { private static final String BASE_FILE_NAME = "test_io_"; private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log"; private static final long MEGA = ByteMultiple.MB.value(); - private static final int DEFAULT_NR_BYTES = 1; + private static final int DEFAULT_NR_BYTES = 128; private static final int DEFAULT_NR_FILES = 4; private static final String USAGE = "Usage: " + TestDFSIO.class.getSimpleName() + " [genericOptions]" + " -read [-random | -backward | -skip [-skipSize Size]] |" + - " -write | -append | -clean" + + " -write | -append | -truncate | -clean" + " [-compression codecClassName]" + " [-nrFiles N]" + " [-size Size[B|KB|MB|GB|TB]]" + @@ -121,7 +120,8 @@ public class TestDFSIO implements Tool { TEST_TYPE_APPEND("append"), TEST_TYPE_READ_RANDOM("random read"), TEST_TYPE_READ_BACKWARD("backward read"), - TEST_TYPE_READ_SKIP("skip read"); + TEST_TYPE_READ_SKIP("skip read"), + TEST_TYPE_TRUNCATE("truncate"); private String type; @@ -192,6 +192,9 @@ public class TestDFSIO implements Tool { private static Path getRandomReadDir(Configuration conf) { return new Path(getBaseDir(conf), "io_random_read"); } + private static Path getTruncateDir(Configuration conf) { + return new Path(getBaseDir(conf), "io_truncate"); + } private static Path getDataDir(Configuration conf) { return new Path(getBaseDir(conf), "io_data"); } @@ -203,6 +206,7 @@ public class TestDFSIO implements Tool { public static void beforeClass() throws Exception { bench = new TestDFSIO(); bench.getConf().setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true); + bench.getConf().setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); cluster = new MiniDFSCluster.Builder(bench.getConf()) .numDataNodes(2) .format(true) @@ -279,6 +283,16 @@ public class TestDFSIO implements Tool { bench.analyzeResult(fs, TestType.TEST_TYPE_APPEND, execTime); } + @Test (timeout = 60000) + public void testTruncate() throws Exception { + FileSystem fs = cluster.getFileSystem(); + bench.createControlFile(fs, DEFAULT_NR_BYTES / 2, DEFAULT_NR_FILES); + long tStart = System.currentTimeMillis(); + bench.truncateTest(fs); + long execTime = System.currentTimeMillis() - tStart; + bench.analyzeResult(fs, TestType.TEST_TYPE_TRUNCATE, execTime); + } + @SuppressWarnings("deprecation") private void createControlFile(FileSystem fs, long nrBytes, // in bytes @@ -301,9 +315,9 @@ public class TestDFSIO implements Tool { } catch(Exception e) { throw new IOException(e.getLocalizedMessage()); } finally { - if (writer != null) + if (writer != null) writer.close(); - writer = null; + writer = null; } } LOG.info("created control files for: "+nrFiles+" files"); @@ -613,6 +627,51 @@ public class TestDFSIO implements Tool { runIOTest(RandomReadMapper.class, readDir); } + /** + * Truncate mapper class. + * The mapper truncates given file to the newLength, specified by -size. + */ + public static class TruncateMapper extends IOStatMapper { + private static final long DELAY = 100L; + + private Path filePath; + private long fileSize; + + @Override // IOMapperBase + public Closeable getIOStream(String name) throws IOException { + filePath = new Path(getDataDir(getConf()), name); + fileSize = fs.getFileStatus(filePath).getLen(); + return null; + } + + @Override // IOMapperBase + public Long doIO(Reporter reporter, + String name, + long newLength // in bytes + ) throws IOException { + boolean isClosed = fs.truncate(filePath, newLength); + reporter.setStatus("truncating " + name + " to newLength " + + newLength + " ::host = " + hostName); + for(int i = 0; !isClosed; i++) { + try { + Thread.sleep(DELAY); + } catch (InterruptedException ignored) {} + FileStatus status = fs.getFileStatus(filePath); + assert status != null : "status is null"; + isClosed = (status.getLen() == newLength); + reporter.setStatus("truncate recover for " + name + " to newLength " + + newLength + " attempt " + i + " ::host = " + hostName); + } + return Long.valueOf(fileSize - newLength); + } + } + + private void truncateTest(FileSystem fs) throws IOException { + Path TruncateDir = getTruncateDir(config); + fs.delete(TruncateDir, true); + runIOTest(TruncateMapper.class, TruncateDir); + } + private void sequentialTest(FileSystem fs, TestType testType, long fileSize, // in bytes @@ -634,6 +693,9 @@ public class TestDFSIO implements Tool { case TEST_TYPE_READ_SKIP: ioer = new RandomReadMapper(); break; + case TEST_TYPE_TRUNCATE: + ioer = new TruncateMapper(); + break; default: return; } @@ -667,7 +729,7 @@ public class TestDFSIO implements Tool { String resFileName = DEFAULT_RES_FILE_NAME; String compressionClass = null; boolean isSequential = false; - String version = TestDFSIO.class.getSimpleName() + ".1.7"; + String version = TestDFSIO.class.getSimpleName() + ".1.8"; LOG.info(version); if (args.length == 0) { @@ -691,6 +753,8 @@ public class TestDFSIO implements Tool { } else if (args[i].equals("-skip")) { if(testType != TestType.TEST_TYPE_READ) return -1; testType = TestType.TEST_TYPE_READ_SKIP; + } else if (args[i].equalsIgnoreCase("-truncate")) { + testType = TestType.TEST_TYPE_TRUNCATE; } else if (args[i].equals("-clean")) { testType = TestType.TEST_TYPE_CLEANUP; } else if (args[i].startsWith("-seq")) { @@ -764,6 +828,11 @@ public class TestDFSIO implements Tool { case TEST_TYPE_READ_BACKWARD: case TEST_TYPE_READ_SKIP: randomReadTest(fs); + break; + case TEST_TYPE_TRUNCATE: + truncateTest(fs); + break; + default: } long execTime = System.currentTimeMillis() - tStart; @@ -799,7 +868,7 @@ public class TestDFSIO implements Tool { return ((float)bytes)/MEGA; } - private void analyzeResult( FileSystem fs, + private void analyzeResult( FileSystem fs, TestType testType, long execTime, String resFileName @@ -872,13 +941,17 @@ public class TestDFSIO implements Tool { case TEST_TYPE_READ_BACKWARD: case TEST_TYPE_READ_SKIP: return new Path(getRandomReadDir(config), "part-00000"); + case TEST_TYPE_TRUNCATE: + return new Path(getTruncateDir(config), "part-00000"); + default: } return null; } private void analyzeResult(FileSystem fs, TestType testType, long execTime) throws IOException { - analyzeResult(fs, testType, execTime, DEFAULT_RES_FILE_NAME); + String dir = System.getProperty("test.build.dir", "target/test-dir"); + analyzeResult(fs, testType, execTime, dir + "/" + DEFAULT_RES_FILE_NAME); } private void cleanup(FileSystem fs)