From 3edc40e3777a4cf226fff1be0bdc0ac4c2f49f34 Mon Sep 17 00:00:00 2001 From: Amar Kamat Date: Thu, 19 Apr 2012 04:26:53 +0000 Subject: [PATCH] MAPREDUCE-4100. [Gridmix] Bug fixed in compression emulation feature for map only jobs. (amarrk) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1327816 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../gridmix/CompressionEmulationUtil.java | 20 ++++---- .../apache/hadoop/mapred/gridmix/LoadJob.java | 50 +++++++++++++------ .../TestCompressionEmulationUtils.java | 5 +- 4 files changed, 49 insertions(+), 29 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 2f88d9dc345..ce94a7519e3 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -52,6 +52,9 @@ Trunk (unreleased changes) BUG FIXES + MAPREDUCE-4100. [Gridmix] Bug fixed in compression emulation feature for + map only jobs. (amarrk) + MAPREDUCE-4149. [Rumen] Rumen fails to parse certain counter strings. (ravigummadi) diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java index 898ca50b517..526878c91c3 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java @@ -85,10 +85,10 @@ class CompressionEmulationUtil { "gridmix.compression-emulation.map-output.compression-ratio"; /** - * Configuration property for setting the compression ratio of reduce output. + * Configuration property for setting the compression ratio of job output. */ - private static final String GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO = - "gridmix.compression-emulation.reduce-output.compression-ratio"; + private static final String GRIDMIX_JOB_OUTPUT_COMPRESSION_RATIO = + "gridmix.compression-emulation.job-output.compression-ratio"; /** * Default compression ratio. @@ -434,20 +434,20 @@ class CompressionEmulationUtil { } /** - * Set the reduce output data compression ratio in the given configuration. + * Set the job output data compression ratio in the given configuration. */ - static void setReduceOutputCompressionEmulationRatio(Configuration conf, - float ratio) { - conf.setFloat(GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO, ratio); + static void setJobOutputCompressionEmulationRatio(Configuration conf, + float ratio) { + conf.setFloat(GRIDMIX_JOB_OUTPUT_COMPRESSION_RATIO, ratio); } /** - * Get the reduce output data compression ratio using the given configuration. + * Get the job output data compression ratio using the given configuration. * If the compression ratio is not set in the configuration then use the * default value i.e {@value #DEFAULT_COMPRESSION_RATIO}. */ - static float getReduceOutputCompressionEmulationRatio(Configuration conf) { - return conf.getFloat(GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO, + static float getJobOutputCompressionEmulationRatio(Configuration conf) { + return conf.getFloat(GRIDMIX_JOB_OUTPUT_COMPRESSION_RATIO, DEFAULT_COMPRESSION_RATIO); } diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java index 8d69414b6ea..9daa3adef3b 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java @@ -288,23 +288,23 @@ class LoadJob extends GridmixJob { final long[] reduceBytes = split.getOutputBytes(); final long[] reduceRecords = split.getOutputRecords(); - // enable gridmix map output record for compression - final boolean emulateMapOutputCompression = - CompressionEmulationUtil.isCompressionEmulationEnabled(conf) - && conf.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false); - float compressionRatio = 1.0f; - if (emulateMapOutputCompression) { - compressionRatio = - CompressionEmulationUtil.getMapOutputCompressionEmulationRatio(conf); - LOG.info("GridMix is configured to use a compression ratio of " - + compressionRatio + " for the map output data."); - key.setCompressibility(true, compressionRatio); - val.setCompressibility(true, compressionRatio); - } - long totalRecords = 0L; final int nReduces = ctxt.getNumReduceTasks(); if (nReduces > 0) { + // enable gridmix map output record for compression + boolean emulateMapOutputCompression = + CompressionEmulationUtil.isCompressionEmulationEnabled(conf) + && conf.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false); + float compressionRatio = 1.0f; + if (emulateMapOutputCompression) { + compressionRatio = + CompressionEmulationUtil.getMapOutputCompressionEmulationRatio(conf); + LOG.info("GridMix is configured to use a compression ratio of " + + compressionRatio + " for the map output data."); + key.setCompressibility(true, compressionRatio); + val.setCompressibility(true, compressionRatio); + } + int idx = 0; int id = split.getId(); for (int i = 0; i < nReduces; ++i) { @@ -332,7 +332,21 @@ class LoadJob extends GridmixJob { } } else { long mapOutputBytes = reduceBytes[0]; - if (emulateMapOutputCompression) { + + // enable gridmix job output compression + boolean emulateJobOutputCompression = + CompressionEmulationUtil.isCompressionEmulationEnabled(conf) + && conf.getBoolean(FileOutputFormat.COMPRESS, false); + + if (emulateJobOutputCompression) { + float compressionRatio = + CompressionEmulationUtil.getJobOutputCompressionEmulationRatio(conf); + LOG.info("GridMix is configured to use a compression ratio of " + + compressionRatio + " for the job output data."); + key.setCompressibility(true, compressionRatio); + val.setCompressibility(true, compressionRatio); + + // set the output size accordingly mapOutputBytes /= compressionRatio; } reduces.add(new AvgRecordFactory(mapOutputBytes, reduceRecords[0], @@ -387,9 +401,13 @@ class LoadJob extends GridmixJob { @Override public void cleanup(Context context) throws IOException, InterruptedException { + LOG.info("Starting the cleanup phase."); for (RecordFactory factory : reduces) { key.setSeed(r.nextLong()); while (factory.next(key, val)) { + // send the progress update (maybe make this a thread) + context.progress(); + context.write(key, val); key.setSeed(r.nextLong()); @@ -462,7 +480,7 @@ class LoadJob extends GridmixJob { && FileOutputFormat.getCompressOutput(context)) { float compressionRatio = CompressionEmulationUtil - .getReduceOutputCompressionEmulationRatio(conf); + .getJobOutputCompressionEmulationRatio(conf); LOG.info("GridMix is configured to use a compression ratio of " + compressionRatio + " for the reduce output data."); val.setCompressibility(true, compressionRatio); diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java index 1e971f7146c..883e87c6600 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java @@ -322,10 +322,9 @@ public class TestCompressionEmulationUtils { public void testOutputCompressionRatioConfiguration() throws Exception { Configuration conf = new Configuration(); float ratio = 0.567F; - CompressionEmulationUtil.setReduceOutputCompressionEmulationRatio(conf, - ratio); + CompressionEmulationUtil.setJobOutputCompressionEmulationRatio(conf, ratio); assertEquals(ratio, - CompressionEmulationUtil.getReduceOutputCompressionEmulationRatio(conf), + CompressionEmulationUtil.getJobOutputCompressionEmulationRatio(conf), 0.0D); }