MAPREDUCE-2722. [Gridmix] Gridmix simulated job's map's hdfsBytesRead counter is wrong when compressed input is used.(ravigummadi via tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1461236 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2013-03-26 17:42:29 +00:00
parent 1f988b777c
commit 67b93ebb23
4 changed files with 59 additions and 6 deletions

View File

@ -80,6 +80,10 @@ Release 2.0.5-beta - UNRELEASED
input data directory already exists and -generate opton is input data directory already exists and -generate opton is
given.(ravigummadi via tgraves) given.(ravigummadi via tgraves)
MAPREDUCE-2722. [Gridmix] Gridmix simulated job's map's hdfsBytesRead
counter is wrong when compressed input is used.(ravigummadi via tgraves)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -571,4 +571,25 @@ static void configureCompressionEmulation(Configuration source,
} }
setInputCompressionEmulationEnabled(target, needsCompressedInput); setInputCompressionEmulationEnabled(target, needsCompressedInput);
} }
/**
* Get the uncompressed input bytes count from the given possibly compressed
* input bytes count.
* @param possiblyCompressedInputBytes input bytes count. This is compressed
* input size if compression emulation is on.
* @param conf configuration of the Gridmix simulated job
* @return uncompressed input bytes count. Compute this in case if compressed
* input was used
*/
static long getUncompressedInputBytes(long possiblyCompressedInputBytes,
Configuration conf) {
long uncompressedInputBytes = possiblyCompressedInputBytes;
if (CompressionEmulationUtil.isInputCompressionEmulationEnabled(conf)) {
float inputCompressionRatio =
CompressionEmulationUtil.getMapInputCompressionEmulationRatio(conf);
uncompressedInputBytes /= inputCompressionRatio;
}
return uncompressedInputBytes;
}
} }

View File

@ -536,9 +536,14 @@ void buildSplits(FilePool inputDir) throws IOException {
} }
} }
final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i); final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
long possiblyCompressedInputBytes = info.getInputBytes();
Configuration conf = job.getConfiguration();
long uncompressedInputBytes =
CompressionEmulationUtil.getUncompressedInputBytes(
possiblyCompressedInputBytes, conf);
splits.add( splits.add(
new LoadSplit(striper.splitFor(inputDir, info.getInputBytes(), 3), new LoadSplit(striper.splitFor(inputDir, uncompressedInputBytes, 3),
maps, i, info.getInputBytes(), info.getInputRecords(), maps, i, uncompressedInputBytes, info.getInputRecords(),
info.getOutputBytes(), info.getOutputRecords(), info.getOutputBytes(), info.getOutputRecords(),
reduceByteRatio, reduceRecordRatio, specBytes, reduceByteRatio, reduceRecordRatio, specBytes,
specRecords, info.getResourceUsageMetrics(), specRecords, info.getResourceUsageMetrics(),

View File

@ -19,7 +19,6 @@
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.BufferedWriter; import java.io.BufferedWriter;
import java.io.DataInput;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
@ -31,13 +30,11 @@
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Utils; import org.apache.hadoop.mapred.Utils;
@ -561,4 +558,30 @@ public void testFileQueueDecompression() throws IOException {
String readLine = new String(bytes); String readLine = new String(bytes);
assertEquals("Compression/Decompression error", inputLine, readLine); assertEquals("Compression/Decompression error", inputLine, readLine);
} }
/**
* Tests the computation logic of uncompressed input bytes by
* {@link LoadJob#getUncompressedInputBytes(long, Configuration)}
*/
@Test
public void testComputeUncompressedInputBytes() {
long possiblyCompressedInputBytes = 100000;
float compressionRatio = 0.45F;
Configuration conf = new Configuration();
CompressionEmulationUtil.setMapInputCompressionEmulationRatio(conf,
compressionRatio);
// By default, input compression emulation is diabled. Verify the
// computation of uncompressed input bytes.
long result = CompressionEmulationUtil.getUncompressedInputBytes(
possiblyCompressedInputBytes, conf);
assertEquals(possiblyCompressedInputBytes, result);
// Enable input compression emulation and verify uncompressed
// input bytes computation logic
CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
result = CompressionEmulationUtil.getUncompressedInputBytes(
possiblyCompressedInputBytes, conf);
assertEquals((long)(possiblyCompressedInputBytes/compressionRatio), result);
}
} }