From df68c56267ca7dfbfee4b241bc84325d1760d12d Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Wed, 6 Mar 2013 15:02:45 +0000 Subject: [PATCH] MAPREDUCE-3685. Fix bugs in MergeManager to ensure compression codec is appropriately used and that on-disk segments are correctly sorted on file-size. Contributed by Anty Rao and Ravi Prakash. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1453365 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 4 ++ .../java/org/apache/hadoop/mapred/Merger.java | 2 +- .../task/reduce/MergeManagerImpl.java | 37 +++++++--- .../task/reduce/OnDiskMapOutput.java | 4 +- .../task/reduce/TestMergeManager.java | 67 ++++++++++++++++++- 5 files changed, 103 insertions(+), 11 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 90140222928..dc877be9279 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -201,6 +201,10 @@ Release 2.0.4-beta - UNRELEASED MAPREDUCE-4896. mapred queue -info spits out ugly exception when queue does not exist. (sandyr via tucu) + MAPREDUCE-3685. Fix bugs in MergeManager to ensure compression codec is + appropriately used and that on-disk segments are correctly sorted on + file-size. (Anty Rao and Ravi Prakash via acmurthy) + Release 2.0.3-alpha - 2013-02-06 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java index d0074707650..ced9040f413 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java @@ -169,7 +169,7 @@ RawKeyValueIterator merge(Configuration conf, FileSystem fs, } - static + public static RawKeyValueIterator merge(Configuration conf, FileSystem fs, Class keyClass, Class valueClass, CompressionCodec codec, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java index 138ea43fdd3..c6f9a36951f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java @@ -477,7 +477,7 @@ public void merge(List> inputs) throws IOException { } writer.close(); compressAwarePath = new CompressAwarePath(outputPath, - writer.getRawLength()); + writer.getRawLength(), writer.getCompressedLength()); LOG.info(reduceId + " Merge of the " + noInMemorySegments + @@ -500,7 +500,7 @@ public void merge(List> inputs) throws IOException { private class OnDiskMerger extends MergeThread { public OnDiskMerger(MergeManagerImpl manager) { - super(manager, Integer.MAX_VALUE, exceptionReporter); + super(manager, ioSortFactor, exceptionReporter); setName("OnDiskMerger - Thread to merge on-disk map-outputs"); setDaemon(true); } @@ -554,7 +554,7 @@ public void merge(List inputs) throws IOException { Merger.writeFile(iter, writer, reporter, jobConf); writer.close(); compressAwarePath = new CompressAwarePath(outputPath, - writer.getRawLength()); + writer.getRawLength(), writer.getCompressedLength()); } catch (IOException e) { localFS.delete(outputPath, true); throw e; @@ -719,7 +719,7 @@ private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs, Merger.writeFile(rIter, writer, reporter, job); writer.close(); onDiskMapOutputs.add(new CompressAwarePath(outputPath, - writer.getRawLength())); + writer.getRawLength(), writer.getCompressedLength())); writer = null; // add to list of final disk outputs. } catch (IOException e) { @@ -791,7 +791,7 @@ public int compare(Segment o1, Segment o2) { // merges. See comment where mergePhaseFinished is being set Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; RawKeyValueIterator diskMerge = Merger.merge( - job, fs, keyClass, valueClass, diskSegments, + job, fs, keyClass, valueClass, codec, diskSegments, ioSortFactor, numInMemSegments, tmpDir, comparator, reporter, false, spilledRecordsCounter, null, thisPhase); diskSegments.clear(); @@ -810,24 +810,45 @@ public int compare(Segment o1, Segment o2) { static class CompressAwarePath extends Path { private long rawDataLength; + private long compressedSize; - public CompressAwarePath(Path path, long rawDataLength) { + public CompressAwarePath(Path path, long rawDataLength, long compressSize) { super(path.toUri()); this.rawDataLength = rawDataLength; + this.compressedSize = compressSize; } public long getRawDataLength() { return rawDataLength; } - + + public long getCompressedSize() { + return compressedSize; + } + @Override public boolean equals(Object other) { return super.equals(other); } - + @Override public int hashCode() { return super.hashCode(); } + + @Override + public int compareTo(Object obj) { + if(obj instanceof CompressAwarePath) { + CompressAwarePath compPath = (CompressAwarePath) obj; + if(this.compressedSize < compPath.getCompressedSize()) { + return -1; + } else if (this.getCompressedSize() > compPath.getCompressedSize()) { + return 1; + } + // Not returning 0 here so that objects with the same size (but + // different paths) are still added to the TreeSet. + } + return super.compareTo(obj); + } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java index bf69798c124..68713d392f3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java @@ -48,6 +48,7 @@ class OnDiskMapOutput extends MapOutput { private final Path outputPath; private final MergeManagerImpl merger; private final OutputStream disk; + private long compressedSize; public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, MergeManagerImpl merger, long size, @@ -108,13 +109,14 @@ public void shuffle(MapHost host, InputStream input, bytesLeft + " bytes missing of " + compressedLength + ")"); } + this.compressedSize = compressedLength; } @Override public void commit() throws IOException { localFS.rename(tmpOutputPath, outputPath); CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath, - getSize()); + getSize(), this.compressedSize); merger.closeOnDiskFile(compressAwarePath); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java index 46d797c93d3..8d6bab92738 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java @@ -17,28 +17,38 @@ */ package org.apache.hadoop.mapreduce.task.reduce; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import java.io.IOException; +import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BoundedByteArrayOutputStream; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MROutputFiles; import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath; import org.junit.Assert; import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; public class TestMergeManager { @Test(timeout=10000) - @SuppressWarnings("unchecked") public void testMemoryMerge() throws Exception { final int TOTAL_MEM_BYTES = 10000; final int OUTPUT_SIZE = 7950; @@ -195,4 +205,59 @@ public int getNumExceptions() { return exceptions.size(); } } + + @SuppressWarnings({ "unchecked", "deprecation" }) + @Test(timeout=10000) + public void testOnDiskMerger() throws IOException, URISyntaxException, + InterruptedException { + JobConf jobConf = new JobConf(); + final int SORT_FACTOR = 5; + jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR); + + MapOutputFile mapOutputFile = new MROutputFiles(); + FileSystem fs = FileSystem.getLocal(jobConf); + MergeManagerImpl manager = + new MergeManagerImpl(null, jobConf, fs, null + , null, null, null, null, null, null, null, null, null, mapOutputFile); + + MergeThread, IntWritable, IntWritable> + onDiskMerger = (MergeThread, + IntWritable, IntWritable>) Whitebox.getInternalState(manager, + "onDiskMerger"); + int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger, + "mergeFactor"); + + // make sure the io.sort.factor is set properly + assertEquals(mergeFactor, SORT_FACTOR); + + // Stop the onDiskMerger thread so that we can intercept the list of files + // waiting to be merged. + onDiskMerger.suspend(); + + //Send the list of fake files waiting to be merged + Random rand = new Random(); + for(int i = 0; i < 2*SORT_FACTOR; ++i) { + Path path = new Path("somePath"); + CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt()); + manager.closeOnDiskFile(cap); + } + + //Check that the files pending to be merged are in sorted order. + LinkedList> pendingToBeMerged = + (LinkedList>) Whitebox.getInternalState( + onDiskMerger, "pendingToBeMerged"); + assertTrue("No inputs were added to list pending to merge", + pendingToBeMerged.size() > 0); + for(int i = 0; i < pendingToBeMerged.size(); ++i) { + List inputs = pendingToBeMerged.get(i); + for(int j = 1; j < inputs.size(); ++j) { + assertTrue("Not enough / too many inputs were going to be merged", + inputs.size() > 0 && inputs.size() <= SORT_FACTOR); + assertTrue("Inputs to be merged were not sorted according to size: ", + inputs.get(j).getCompressedSize() + >= inputs.get(j-1).getCompressedSize()); + } + } + + } }