MAPREDUCE-2786. Add compression option for TestDFSIO. Contributed by Plamen Jeliazkov.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1380308 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Konstantin Shvachko 2012-09-03 18:50:28 +00:00
parent 2257cd8123
commit 54ca8b837c
3 changed files with 39 additions and 1 deletions

View File

@ -21,6 +21,9 @@ Branch-2 ( Unreleased changes )
MAPREDUCE-4408. allow jobs to set a JAR that is in the distributed cached MAPREDUCE-4408. allow jobs to set a JAR that is in the distributed cached
(rkanter via tucu) (rkanter via tucu)
MAPREDUCE-2786. Add compression option for TestDFSIO.
(Plamen Jeliazkov via shv)
BUG FIXES BUG FIXES
MAPREDUCE-4422. YARN_APPLICATION_CLASSPATH needs a documented default value in MAPREDUCE-4422. YARN_APPLICATION_CLASSPATH needs a documented default value in

View File

@ -22,7 +22,9 @@ import java.net.InetAddress;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.ReflectionUtils;
/** /**
* Base mapper class for IO operations. * Base mapper class for IO operations.
@ -41,6 +43,7 @@ public abstract class IOMapperBase<T> extends Configured
protected int bufferSize; protected int bufferSize;
protected FileSystem fs; protected FileSystem fs;
protected String hostName; protected String hostName;
protected CompressionCodec compressionCodec;
public IOMapperBase() { public IOMapperBase() {
} }
@ -59,6 +62,22 @@ public abstract class IOMapperBase<T> extends Configured
} catch(Exception e) { } catch(Exception e) {
hostName = "localhost"; hostName = "localhost";
} }
//grab compression
String compression = getConf().get("test.io.compression.class", null);
Class<? extends CompressionCodec> codec;
//try to initialize codec
try {
codec = (compression == null) ? null :
Class.forName(compression).asSubclass(CompressionCodec.class);
} catch(Exception e) {
throw new RuntimeException("Compression codec not found: ", e);
}
if(codec != null) {
compressionCodec = (CompressionCodec) ReflectionUtils.newInstance(codec, getConf());
}
} }
public void close() throws IOException { public void close() throws IOException {

View File

@ -23,6 +23,7 @@ import java.io.DataInputStream;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintStream; import java.io.PrintStream;
@ -296,6 +297,8 @@ public class TestDFSIO extends TestCase implements Tool {
OutputStream out; OutputStream out;
out = fs.create(new Path(getDataDir(getConf()), name), true, bufferSize); out = fs.create(new Path(getDataDir(getConf()), name), true, bufferSize);
if(compressionCodec != null) out = compressionCodec.createOutputStream(out);
try { try {
// write to the file // write to the file
long nrRemaining; long nrRemaining;
@ -358,6 +361,8 @@ public class TestDFSIO extends TestCase implements Tool {
OutputStream out; OutputStream out;
out = fs.append(new Path(getDataDir(getConf()), name), bufferSize); out = fs.append(new Path(getDataDir(getConf()), name), bufferSize);
if(compressionCodec != null) out = compressionCodec.createOutputStream(out);
try { try {
// write to the file // write to the file
long nrRemaining; long nrRemaining;
@ -394,7 +399,10 @@ public class TestDFSIO extends TestCase implements Tool {
long totalSize // in bytes long totalSize // in bytes
) throws IOException { ) throws IOException {
// open file // open file
DataInputStream in = fs.open(new Path(getDataDir(getConf()), name)); InputStream in = fs.open(new Path(getDataDir(getConf()), name));
if(compressionCodec != null) in = compressionCodec.createInputStream(in);
long actualSize = 0; long actualSize = 0;
try { try {
while (actualSize < totalSize) { while (actualSize < totalSize) {
@ -459,6 +467,7 @@ public class TestDFSIO extends TestCase implements Tool {
long fileSize = 1*MEGA; long fileSize = 1*MEGA;
int nrFiles = 1; int nrFiles = 1;
String resFileName = DEFAULT_RES_FILE_NAME; String resFileName = DEFAULT_RES_FILE_NAME;
String compressionClass = null;
boolean isSequential = false; boolean isSequential = false;
String version = TestDFSIO.class.getSimpleName() + ".0.0.6"; String version = TestDFSIO.class.getSimpleName() + ".0.0.6";
@ -479,6 +488,8 @@ public class TestDFSIO extends TestCase implements Tool {
testType = TEST_TYPE_CLEANUP; testType = TEST_TYPE_CLEANUP;
} else if (args[i].startsWith("-seq")) { } else if (args[i].startsWith("-seq")) {
isSequential = true; isSequential = true;
} else if (args[i].startsWith("-compression")) {
compressionClass = args[++i];
} else if (args[i].equals("-nrFiles")) { } else if (args[i].equals("-nrFiles")) {
nrFiles = Integer.parseInt(args[++i]); nrFiles = Integer.parseInt(args[++i]);
} else if (args[i].equals("-fileSize")) { } else if (args[i].equals("-fileSize")) {
@ -498,6 +509,11 @@ public class TestDFSIO extends TestCase implements Tool {
LOG.info("bufferSize = " + bufferSize); LOG.info("bufferSize = " + bufferSize);
LOG.info("baseDir = " + getBaseDir(config)); LOG.info("baseDir = " + getBaseDir(config));
if(compressionClass != null) {
config.set("test.io.compression.class", compressionClass);
LOG.info("compressionClass = " + compressionClass);
}
config.setInt("test.io.file.buffer.size", bufferSize); config.setInt("test.io.file.buffer.size", bufferSize);
config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true); config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
FileSystem fs = FileSystem.get(config); FileSystem fs = FileSystem.get(config);