diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 501970f098f..c2633ea9be4 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -147,6 +147,9 @@ Branch-2 ( Unreleased changes ) MAPREDUCE-4408. allow jobs to set a JAR that is in the distributed cached (rkanter via tucu) + MAPREDUCE-2786. Add compression option for TestDFSIO. + (Plamen Jeliazkov via shv) + BUG FIXES MAPREDUCE-4422. YARN_APPLICATION_CLASSPATH needs a documented default value in diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java index 69741f8fa3e..fe1af6afcd4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/IOMapperBase.java @@ -22,7 +22,9 @@ import java.net.InetAddress; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.ReflectionUtils; /** * Base mapper class for IO operations. @@ -41,6 +43,7 @@ public abstract class IOMapperBase extends Configured protected int bufferSize; protected FileSystem fs; protected String hostName; + protected CompressionCodec compressionCodec; public IOMapperBase() { } @@ -59,6 +62,22 @@ public abstract class IOMapperBase extends Configured } catch(Exception e) { hostName = "localhost"; } + + //grab compression + String compression = getConf().get("test.io.compression.class", null); + Class codec; + + //try to initialize codec + try { + codec = (compression == null) ? null : + Class.forName(compression).asSubclass(CompressionCodec.class); + } catch(Exception e) { + throw new RuntimeException("Compression codec not found: ", e); + } + + if(codec != null) { + compressionCodec = (CompressionCodec) ReflectionUtils.newInstance(codec, getConf()); + } } public void close() throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java index 896240eed09..0d589ff59fa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java @@ -23,6 +23,7 @@ import java.io.DataInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintStream; @@ -295,6 +296,8 @@ public class TestDFSIO extends TestCase implements Tool { // create file OutputStream out; out = fs.create(new Path(getDataDir(getConf()), name), true, bufferSize); + + if(compressionCodec != null) out = compressionCodec.createOutputStream(out); try { // write to the file @@ -358,6 +361,8 @@ public class TestDFSIO extends TestCase implements Tool { OutputStream out; out = fs.append(new Path(getDataDir(getConf()), name), bufferSize); + if(compressionCodec != null) out = compressionCodec.createOutputStream(out); + try { // write to the file long nrRemaining; @@ -394,7 +399,10 @@ public class TestDFSIO extends TestCase implements Tool { long totalSize // in bytes ) throws IOException { // open file - DataInputStream in = fs.open(new Path(getDataDir(getConf()), name)); + InputStream in = fs.open(new Path(getDataDir(getConf()), name)); + + if(compressionCodec != null) in = compressionCodec.createInputStream(in); + long actualSize = 0; try { while (actualSize < totalSize) { @@ -459,6 +467,7 @@ public class TestDFSIO extends TestCase implements Tool { long fileSize = 1*MEGA; int nrFiles = 1; String resFileName = DEFAULT_RES_FILE_NAME; + String compressionClass = null; boolean isSequential = false; String version = TestDFSIO.class.getSimpleName() + ".0.0.6"; @@ -479,6 +488,8 @@ public class TestDFSIO extends TestCase implements Tool { testType = TEST_TYPE_CLEANUP; } else if (args[i].startsWith("-seq")) { isSequential = true; + } else if (args[i].startsWith("-compression")) { + compressionClass = args[++i]; } else if (args[i].equals("-nrFiles")) { nrFiles = Integer.parseInt(args[++i]); } else if (args[i].equals("-fileSize")) { @@ -497,6 +508,11 @@ public class TestDFSIO extends TestCase implements Tool { LOG.info("fileSize (MB) = " + toMB(fileSize)); LOG.info("bufferSize = " + bufferSize); LOG.info("baseDir = " + getBaseDir(config)); + + if(compressionClass != null) { + config.set("test.io.compression.class", compressionClass); + LOG.info("compressionClass = " + compressionClass); + } config.setInt("test.io.file.buffer.size", bufferSize); config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);