MAPREDUCE-2786. Add compression option for TestDFSIO. Contributed by Plamen Jeliazkov.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1380310 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Konstantin Shvachko 2012-09-03 18:54:27 +00:00
parent 3969bcb7c9
commit ab986d7cf6
3 changed files with 39 additions and 1 deletions

View File

@ -147,6 +147,9 @@ Branch-2 ( Unreleased changes )
MAPREDUCE-4408. allow jobs to set a JAR that is in the distributed cached
(rkanter via tucu)
MAPREDUCE-2786. Add compression option for TestDFSIO.
(Plamen Jeliazkov via shv)
BUG FIXES
MAPREDUCE-4422. YARN_APPLICATION_CLASSPATH needs a documented default value in

View File

@ -22,7 +22,9 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Base mapper class for IO operations.
@ -41,6 +43,7 @@ public abstract class IOMapperBase<T> extends Configured
protected int bufferSize;
protected FileSystem fs;
protected String hostName;
protected CompressionCodec compressionCodec;
public IOMapperBase() {
}
@ -59,6 +62,22 @@ public void configure(JobConf conf) {
} catch(Exception e) {
hostName = "localhost";
}
//grab compression
String compression = getConf().get("test.io.compression.class", null);
Class<? extends CompressionCodec> codec;
//try to initialize codec
try {
codec = (compression == null) ? null :
Class.forName(compression).asSubclass(CompressionCodec.class);
} catch(Exception e) {
throw new RuntimeException("Compression codec not found: ", e);
}
if(codec != null) {
compressionCodec = (CompressionCodec) ReflectionUtils.newInstance(codec, getConf());
}
}
public void close() throws IOException {

View File

@ -23,6 +23,7 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
@ -295,6 +296,8 @@ public Long doIO(Reporter reporter,
// create file
OutputStream out;
out = fs.create(new Path(getDataDir(getConf()), name), true, bufferSize);
if(compressionCodec != null) out = compressionCodec.createOutputStream(out);
try {
// write to the file
@ -358,6 +361,8 @@ public Long doIO(Reporter reporter,
OutputStream out;
out = fs.append(new Path(getDataDir(getConf()), name), bufferSize);
if(compressionCodec != null) out = compressionCodec.createOutputStream(out);
try {
// write to the file
long nrRemaining;
@ -394,7 +399,10 @@ public Long doIO(Reporter reporter,
long totalSize // in bytes
) throws IOException {
// open file
DataInputStream in = fs.open(new Path(getDataDir(getConf()), name));
InputStream in = fs.open(new Path(getDataDir(getConf()), name));
if(compressionCodec != null) in = compressionCodec.createInputStream(in);
long actualSize = 0;
try {
while (actualSize < totalSize) {
@ -459,6 +467,7 @@ public int run(String[] args) throws IOException {
long fileSize = 1*MEGA;
int nrFiles = 1;
String resFileName = DEFAULT_RES_FILE_NAME;
String compressionClass = null;
boolean isSequential = false;
String version = TestDFSIO.class.getSimpleName() + ".0.0.6";
@ -479,6 +488,8 @@ public int run(String[] args) throws IOException {
testType = TEST_TYPE_CLEANUP;
} else if (args[i].startsWith("-seq")) {
isSequential = true;
} else if (args[i].startsWith("-compression")) {
compressionClass = args[++i];
} else if (args[i].equals("-nrFiles")) {
nrFiles = Integer.parseInt(args[++i]);
} else if (args[i].equals("-fileSize")) {
@ -497,6 +508,11 @@ public int run(String[] args) throws IOException {
LOG.info("fileSize (MB) = " + toMB(fileSize));
LOG.info("bufferSize = " + bufferSize);
LOG.info("baseDir = " + getBaseDir(config));
if(compressionClass != null) {
config.set("test.io.compression.class", compressionClass);
LOG.info("compressionClass = " + compressionClass);
}
config.setInt("test.io.file.buffer.size", bufferSize);
config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);