MAPREDUCE-3685. Fix bugs in MergeManager to ensure compression codec is appropriately used and that on-disk segments are correctly sorted on file-size. Contributed by Anty Rao and Ravi Prakash.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1453365 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2013-03-06 15:02:45 +00:00
parent e1062b8b78
commit df68c56267
5 changed files with 103 additions and 11 deletions

View File

@ -201,6 +201,10 @@ Release 2.0.4-beta - UNRELEASED
MAPREDUCE-4896. mapred queue -info spits out ugly exception when queue does MAPREDUCE-4896. mapred queue -info spits out ugly exception when queue does
not exist. (sandyr via tucu) not exist. (sandyr via tucu)
MAPREDUCE-3685. Fix bugs in MergeManager to ensure compression codec is
appropriately used and that on-disk segments are correctly sorted on
file-size. (Anty Rao and Ravi Prakash via acmurthy)
Release 2.0.3-alpha - 2013-02-06 Release 2.0.3-alpha - 2013-02-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -169,7 +169,7 @@ public class Merger {
} }
static <K extends Object, V extends Object> public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs, RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass, Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec, CompressionCodec codec,

View File

@ -477,7 +477,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
} }
writer.close(); writer.close();
compressAwarePath = new CompressAwarePath(outputPath, compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength()); writer.getRawLength(), writer.getCompressedLength());
LOG.info(reduceId + LOG.info(reduceId +
" Merge of the " + noInMemorySegments + " Merge of the " + noInMemorySegments +
@ -500,7 +500,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
private class OnDiskMerger extends MergeThread<CompressAwarePath,K,V> { private class OnDiskMerger extends MergeThread<CompressAwarePath,K,V> {
public OnDiskMerger(MergeManagerImpl<K, V> manager) { public OnDiskMerger(MergeManagerImpl<K, V> manager) {
super(manager, Integer.MAX_VALUE, exceptionReporter); super(manager, ioSortFactor, exceptionReporter);
setName("OnDiskMerger - Thread to merge on-disk map-outputs"); setName("OnDiskMerger - Thread to merge on-disk map-outputs");
setDaemon(true); setDaemon(true);
} }
@ -554,7 +554,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
Merger.writeFile(iter, writer, reporter, jobConf); Merger.writeFile(iter, writer, reporter, jobConf);
writer.close(); writer.close();
compressAwarePath = new CompressAwarePath(outputPath, compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength()); writer.getRawLength(), writer.getCompressedLength());
} catch (IOException e) { } catch (IOException e) {
localFS.delete(outputPath, true); localFS.delete(outputPath, true);
throw e; throw e;
@ -719,7 +719,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
Merger.writeFile(rIter, writer, reporter, job); Merger.writeFile(rIter, writer, reporter, job);
writer.close(); writer.close();
onDiskMapOutputs.add(new CompressAwarePath(outputPath, onDiskMapOutputs.add(new CompressAwarePath(outputPath,
writer.getRawLength())); writer.getRawLength(), writer.getCompressedLength()));
writer = null; writer = null;
// add to list of final disk outputs. // add to list of final disk outputs.
} catch (IOException e) { } catch (IOException e) {
@ -791,7 +791,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
// merges. See comment where mergePhaseFinished is being set // merges. See comment where mergePhaseFinished is being set
Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; Progress thisPhase = (mergePhaseFinished) ? null : mergePhase;
RawKeyValueIterator diskMerge = Merger.merge( RawKeyValueIterator diskMerge = Merger.merge(
job, fs, keyClass, valueClass, diskSegments, job, fs, keyClass, valueClass, codec, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator, ioSortFactor, numInMemSegments, tmpDir, comparator,
reporter, false, spilledRecordsCounter, null, thisPhase); reporter, false, spilledRecordsCounter, null, thisPhase);
diskSegments.clear(); diskSegments.clear();
@ -810,16 +810,22 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
static class CompressAwarePath extends Path { static class CompressAwarePath extends Path {
private long rawDataLength; private long rawDataLength;
private long compressedSize;
public CompressAwarePath(Path path, long rawDataLength) { public CompressAwarePath(Path path, long rawDataLength, long compressSize) {
super(path.toUri()); super(path.toUri());
this.rawDataLength = rawDataLength; this.rawDataLength = rawDataLength;
this.compressedSize = compressSize;
} }
public long getRawDataLength() { public long getRawDataLength() {
return rawDataLength; return rawDataLength;
} }
public long getCompressedSize() {
return compressedSize;
}
@Override @Override
public boolean equals(Object other) { public boolean equals(Object other) {
return super.equals(other); return super.equals(other);
@ -829,5 +835,20 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
public int hashCode() { public int hashCode() {
return super.hashCode(); return super.hashCode();
} }
@Override
public int compareTo(Object obj) {
if(obj instanceof CompressAwarePath) {
CompressAwarePath compPath = (CompressAwarePath) obj;
if(this.compressedSize < compPath.getCompressedSize()) {
return -1;
} else if (this.getCompressedSize() > compPath.getCompressedSize()) {
return 1;
}
// Not returning 0 here so that objects with the same size (but
// different paths) are still added to the TreeSet.
}
return super.compareTo(obj);
}
} }
} }

View File

@ -48,6 +48,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
private final Path outputPath; private final Path outputPath;
private final MergeManagerImpl<K, V> merger; private final MergeManagerImpl<K, V> merger;
private final OutputStream disk; private final OutputStream disk;
private long compressedSize;
public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
MergeManagerImpl<K, V> merger, long size, MergeManagerImpl<K, V> merger, long size,
@ -108,13 +109,14 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
bytesLeft + " bytes missing of " + bytesLeft + " bytes missing of " +
compressedLength + ")"); compressedLength + ")");
} }
this.compressedSize = compressedLength;
} }
@Override @Override
public void commit() throws IOException { public void commit() throws IOException {
localFS.rename(tmpOutputPath, outputPath); localFS.rename(tmpOutputPath, outputPath);
CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath, CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath,
getSize()); getSize(), this.compressedSize);
merger.closeOnDiskFile(compressAwarePath); merger.closeOnDiskFile(compressAwarePath);
} }

View File

@ -17,28 +17,38 @@
*/ */
package org.apache.hadoop.mapreduce.task.reduce; package org.apache.hadoop.mapreduce.task.reduce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BoundedByteArrayOutputStream; import org.apache.hadoop.io.BoundedByteArrayOutputStream;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MROutputFiles;
import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
public class TestMergeManager { public class TestMergeManager {
@Test(timeout=10000) @Test(timeout=10000)
@SuppressWarnings("unchecked")
public void testMemoryMerge() throws Exception { public void testMemoryMerge() throws Exception {
final int TOTAL_MEM_BYTES = 10000; final int TOTAL_MEM_BYTES = 10000;
final int OUTPUT_SIZE = 7950; final int OUTPUT_SIZE = 7950;
@ -195,4 +205,59 @@ public class TestMergeManager {
return exceptions.size(); return exceptions.size();
} }
} }
@SuppressWarnings({ "unchecked", "deprecation" })
@Test(timeout=10000)
public void testOnDiskMerger() throws IOException, URISyntaxException,
InterruptedException {
JobConf jobConf = new JobConf();
final int SORT_FACTOR = 5;
jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);
MapOutputFile mapOutputFile = new MROutputFiles();
FileSystem fs = FileSystem.getLocal(jobConf);
MergeManagerImpl<IntWritable, IntWritable> manager =
new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
, null, null, null, null, null, null, null, null, null, mapOutputFile);
MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
IntWritable, IntWritable>) Whitebox.getInternalState(manager,
"onDiskMerger");
int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
"mergeFactor");
// make sure the io.sort.factor is set properly
assertEquals(mergeFactor, SORT_FACTOR);
// Stop the onDiskMerger thread so that we can intercept the list of files
// waiting to be merged.
onDiskMerger.suspend();
//Send the list of fake files waiting to be merged
Random rand = new Random();
for(int i = 0; i < 2*SORT_FACTOR; ++i) {
Path path = new Path("somePath");
CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt());
manager.closeOnDiskFile(cap);
}
//Check that the files pending to be merged are in sorted order.
LinkedList<List<CompressAwarePath>> pendingToBeMerged =
(LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
onDiskMerger, "pendingToBeMerged");
assertTrue("No inputs were added to list pending to merge",
pendingToBeMerged.size() > 0);
for(int i = 0; i < pendingToBeMerged.size(); ++i) {
List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
for(int j = 1; j < inputs.size(); ++j) {
assertTrue("Not enough / too many inputs were going to be merged",
inputs.size() > 0 && inputs.size() <= SORT_FACTOR);
assertTrue("Inputs to be merged were not sorted according to size: ",
inputs.get(j).getCompressedSize()
>= inputs.get(j-1).getCompressedSize());
}
}
}
} }