MAPREDUCE-5008. Merger progress miscounts with respect to EOF_MARKER. Contributed by Sandy Ryza.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1450723 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas White 2013-02-27 10:40:00 +00:00
parent 42e987f11e
commit 14089f1e57
2 changed files with 9 additions and 4 deletions

View File

@ -189,6 +189,9 @@ Release 2.0.4-beta - UNRELEASED
MAPREDUCE-4951. Container preemption interpreted as task failure. MAPREDUCE-4951. Container preemption interpreted as task failure.
(Sandy Ryza via tomwhite) (Sandy Ryza via tomwhite)
MAPREDUCE-5008. Merger progress miscounts with respect to EOF_MARKER.
(Sandy Ryza via tomwhite)
Release 2.0.3-alpha - 2013-02-06 Release 2.0.3-alpha - 2013-02-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -475,9 +475,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
combineCollector.setWriter(writer); combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter); combineAndSpill(rIter, reduceCombineInputCounter);
} }
writer.close();
compressAwarePath = new CompressAwarePath(outputPath, compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength()); writer.getRawLength());
writer.close();
LOG.info(reduceId + LOG.info(reduceId +
" Merge of the " + noInMemorySegments + " Merge of the " + noInMemorySegments +
@ -552,9 +552,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
mergedMapOutputsCounter, null); mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf); Merger.writeFile(iter, writer, reporter, jobConf);
writer.close();
compressAwarePath = new CompressAwarePath(outputPath, compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength()); writer.getRawLength());
writer.close();
} catch (IOException e) { } catch (IOException e) {
localFS.delete(outputPath, true); localFS.delete(outputPath, true);
throw e; throw e;
@ -713,13 +713,15 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
keyClass, valueClass, memDiskSegments, numMemDiskSegments, keyClass, valueClass, memDiskSegments, numMemDiskSegments,
tmpDir, comparator, reporter, spilledRecordsCounter, null, tmpDir, comparator, reporter, spilledRecordsCounter, null,
mergePhase); mergePhase);
final Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath, Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
keyClass, valueClass, codec, null); keyClass, valueClass, codec, null);
try { try {
Merger.writeFile(rIter, writer, reporter, job); Merger.writeFile(rIter, writer, reporter, job);
// add to list of final disk outputs. writer.close();
onDiskMapOutputs.add(new CompressAwarePath(outputPath, onDiskMapOutputs.add(new CompressAwarePath(outputPath,
writer.getRawLength())); writer.getRawLength()));
writer = null;
// add to list of final disk outputs.
} catch (IOException e) { } catch (IOException e) {
if (null != outputPath) { if (null != outputPath) {
try { try {