MAPREDUCE-2264. Job status exceeds 100% in some cases. (devaraj.k and sandyr via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1438800 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-01-26 00:50:17 +00:00
parent d8cdee8360
commit 7ac9d3b1c5
5 changed files with 236 additions and 23 deletions

View File

@ -118,6 +118,9 @@ Release 2.0.3-alpha - Unreleased
MAPREDUCE-4948. Fix a failing unit test TestYARNRunner.testHistoryServerToken. MAPREDUCE-4948. Fix a failing unit test TestYARNRunner.testHistoryServerToken.
(Junping Du via sseth) (Junping Du via sseth)
MAPREDUCE-2264. Job status exceeds 100% in some cases.
(devaraj.k and sandyr via tucu)
Release 2.0.2-alpha - 2012-09-07 Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -218,6 +218,7 @@ public static class Segment<K extends Object, V extends Object> {
CompressionCodec codec = null; CompressionCodec codec = null;
long segmentOffset = 0; long segmentOffset = 0;
long segmentLength = -1; long segmentLength = -1;
long rawDataLength = -1;
Counters.Counter mapOutputsCounter = null; Counters.Counter mapOutputsCounter = null;
@ -235,6 +236,15 @@ public Segment(Configuration conf, FileSystem fs, Path file,
mergedMapOutputsCounter); mergedMapOutputsCounter);
} }
public Segment(Configuration conf, FileSystem fs, Path file,
CompressionCodec codec, boolean preserve,
Counters.Counter mergedMapOutputsCounter, long rawDataLength)
throws IOException {
this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve,
mergedMapOutputsCounter);
this.rawDataLength = rawDataLength;
}
public Segment(Configuration conf, FileSystem fs, Path file, public Segment(Configuration conf, FileSystem fs, Path file,
long segmentOffset, long segmentLength, long segmentOffset, long segmentLength,
CompressionCodec codec, CompressionCodec codec,
@ -262,6 +272,11 @@ public Segment(Reader<K, V> reader, boolean preserve) {
this(reader, preserve, null); this(reader, preserve, null);
} }
public Segment(Reader<K, V> reader, boolean preserve, long rawDataLength) {
this(reader, preserve, null);
this.rawDataLength = rawDataLength;
}
public Segment(Reader<K, V> reader, boolean preserve, public Segment(Reader<K, V> reader, boolean preserve,
Counters.Counter mapOutputsCounter) { Counters.Counter mapOutputsCounter) {
this.reader = reader; this.reader = reader;
@ -300,6 +315,10 @@ public long getLength() {
segmentLength : reader.getLength(); segmentLength : reader.getLength();
} }
public long getRawDataLength() {
return (rawDataLength > 0) ? rawDataLength : getLength();
}
boolean nextRawKey() throws IOException { boolean nextRawKey() throws IOException {
return reader.nextRawKey(key); return reader.nextRawKey(key);
} }
@ -633,7 +652,7 @@ RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
totalBytesProcessed = 0; totalBytesProcessed = 0;
totalBytes = 0; totalBytes = 0;
for (int i = 0; i < segmentsToMerge.size(); i++) { for (int i = 0; i < segmentsToMerge.size(); i++) {
totalBytes += segmentsToMerge.get(i).getLength(); totalBytes += segmentsToMerge.get(i).getRawDataLength();
} }
} }
if (totalBytes != 0) //being paranoid if (totalBytes != 0) //being paranoid
@ -702,7 +721,7 @@ RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
// size will match(almost) if combiner is not called in merge. // size will match(almost) if combiner is not called in merge.
long inputBytesOfThisMerge = totalBytesProcessed - long inputBytesOfThisMerge = totalBytesProcessed -
bytesProcessedInPrevMerges; bytesProcessedInPrevMerges;
totalBytes -= inputBytesOfThisMerge - tempSegment.getLength(); totalBytes -= inputBytesOfThisMerge - tempSegment.getRawDataLength();
if (totalBytes != 0) { if (totalBytes != 0) {
progPerByte = 1.0f / (float)totalBytes; progPerByte = 1.0f / (float)totalBytes;
} }
@ -768,7 +787,7 @@ long computeBytesInMerges(int factor, int inMem) {
for (int i = 0; i < numSegments; i++) { for (int i = 0; i < numSegments; i++) {
// Not handling empty segments here assuming that it would not affect // Not handling empty segments here assuming that it would not affect
// much in calculation of mergeProgress. // much in calculation of mergeProgress.
segmentSizes.add(segments.get(i).getLength()); segmentSizes.add(segments.get(i).getRawDataLength());
} }
// If includeFinalMerge is true, allow the following while loop iterate // If includeFinalMerge is true, allow the following while loop iterate

View File

@ -89,7 +89,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
new TreeSet<InMemoryMapOutput<K,V>>(new MapOutputComparator<K, V>()); new TreeSet<InMemoryMapOutput<K,V>>(new MapOutputComparator<K, V>());
private final MergeThread<InMemoryMapOutput<K,V>, K,V> inMemoryMerger; private final MergeThread<InMemoryMapOutput<K,V>, K,V> inMemoryMerger;
Set<Path> onDiskMapOutputs = new TreeSet<Path>(); Set<CompressAwarePath> onDiskMapOutputs = new TreeSet<CompressAwarePath>();
private final OnDiskMerger onDiskMerger; private final OnDiskMerger onDiskMerger;
private final long memoryLimit; private final long memoryLimit;
@ -336,7 +336,7 @@ public synchronized void closeInMemoryMergedFile(InMemoryMapOutput<K,V> mapOutpu
inMemoryMergedMapOutputs.size()); inMemoryMergedMapOutputs.size());
} }
public synchronized void closeOnDiskFile(Path file) { public synchronized void closeOnDiskFile(CompressAwarePath file) {
onDiskMapOutputs.add(file); onDiskMapOutputs.add(file);
if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) { if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
@ -356,7 +356,7 @@ public RawKeyValueIterator close() throws Throwable {
List<InMemoryMapOutput<K, V>> memory = List<InMemoryMapOutput<K, V>> memory =
new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs); new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
memory.addAll(inMemoryMapOutputs); memory.addAll(inMemoryMapOutputs);
List<Path> disk = new ArrayList<Path>(onDiskMapOutputs); List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
return finalMerge(jobConf, rfs, memory, disk); return finalMerge(jobConf, rfs, memory, disk);
} }
@ -456,6 +456,7 @@ public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
codec, null); codec, null);
RawKeyValueIterator rIter = null; RawKeyValueIterator rIter = null;
CompressAwarePath compressAwarePath;
try { try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments + LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments..."); " segments...");
@ -474,6 +475,8 @@ public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
combineCollector.setWriter(writer); combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter); combineAndSpill(rIter, reduceCombineInputCounter);
} }
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength());
writer.close(); writer.close();
LOG.info(reduceId + LOG.info(reduceId +
@ -489,12 +492,12 @@ public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
} }
// Note the output of the merge // Note the output of the merge
closeOnDiskFile(outputPath); closeOnDiskFile(compressAwarePath);
} }
} }
private class OnDiskMerger extends MergeThread<Path,K,V> { private class OnDiskMerger extends MergeThread<CompressAwarePath,K,V> {
public OnDiskMerger(MergeManagerImpl<K, V> manager) { public OnDiskMerger(MergeManagerImpl<K, V> manager) {
super(manager, Integer.MAX_VALUE, exceptionReporter); super(manager, Integer.MAX_VALUE, exceptionReporter);
@ -503,7 +506,7 @@ public OnDiskMerger(MergeManagerImpl<K, V> manager) {
} }
@Override @Override
public void merge(List<Path> inputs) throws IOException { public void merge(List<CompressAwarePath> inputs) throws IOException {
// sanity check // sanity check
if (inputs == null || inputs.isEmpty()) { if (inputs == null || inputs.isEmpty()) {
LOG.info("No ondisk files to merge..."); LOG.info("No ondisk files to merge...");
@ -518,8 +521,8 @@ public void merge(List<Path> inputs) throws IOException {
" map outputs on disk. Triggering merge..."); " map outputs on disk. Triggering merge...");
// 1. Prepare the list of files to be merged. // 1. Prepare the list of files to be merged.
for (Path file : inputs) { for (CompressAwarePath file : inputs) {
approxOutputSize += localFS.getFileStatus(file).getLen(); approxOutputSize += localFS.getFileStatus(file.getPath()).getLen();
} }
// add the checksum length // add the checksum length
@ -536,6 +539,7 @@ public void merge(List<Path> inputs) throws IOException {
(Class<V>) jobConf.getMapOutputValueClass(), (Class<V>) jobConf.getMapOutputValueClass(),
codec, null); codec, null);
RawKeyValueIterator iter = null; RawKeyValueIterator iter = null;
CompressAwarePath compressAwarePath;
Path tmpDir = new Path(reduceId.toString()); Path tmpDir = new Path(reduceId.toString());
try { try {
iter = Merger.merge(jobConf, rfs, iter = Merger.merge(jobConf, rfs,
@ -548,13 +552,15 @@ public void merge(List<Path> inputs) throws IOException {
mergedMapOutputsCounter, null); mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf); Merger.writeFile(iter, writer, reporter, jobConf);
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength());
writer.close(); writer.close();
} catch (IOException e) { } catch (IOException e) {
localFS.delete(outputPath, true); localFS.delete(outputPath, true);
throw e; throw e;
} }
closeOnDiskFile(outputPath); closeOnDiskFile(compressAwarePath);
LOG.info(reduceId + LOG.info(reduceId +
" Finished merging " + inputs.size() + " Finished merging " + inputs.size() +
@ -653,7 +659,7 @@ public void close() throws IOException {
private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs, private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
List<InMemoryMapOutput<K,V>> inMemoryMapOutputs, List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
List<Path> onDiskMapOutputs List<CompressAwarePath> onDiskMapOutputs
) throws IOException { ) throws IOException {
LOG.info("finalMerge called with " + LOG.info("finalMerge called with " +
inMemoryMapOutputs.size() + " in-memory map-outputs and " + inMemoryMapOutputs.size() + " in-memory map-outputs and " +
@ -712,7 +718,8 @@ private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
try { try {
Merger.writeFile(rIter, writer, reporter, job); Merger.writeFile(rIter, writer, reporter, job);
// add to list of final disk outputs. // add to list of final disk outputs.
onDiskMapOutputs.add(outputPath); onDiskMapOutputs.add(new CompressAwarePath(outputPath,
writer.getRawLength()));
} catch (IOException e) { } catch (IOException e) {
if (null != outputPath) { if (null != outputPath) {
try { try {
@ -742,15 +749,19 @@ private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
// segments on disk // segments on disk
List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>(); List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
long onDiskBytes = inMemToDiskBytes; long onDiskBytes = inMemToDiskBytes;
Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]); long rawBytes = inMemToDiskBytes;
for (Path file : onDisk) { CompressAwarePath[] onDisk = onDiskMapOutputs.toArray(
onDiskBytes += fs.getFileStatus(file).getLen(); new CompressAwarePath[onDiskMapOutputs.size()]);
LOG.debug("Disk file: " + file + " Length is " + for (CompressAwarePath file : onDisk) {
fs.getFileStatus(file).getLen()); long fileLength = fs.getFileStatus(file.getPath()).getLen();
diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs, onDiskBytes += fileLength;
rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : fileLength;
LOG.debug("Disk file: " + file + " Length is " + fileLength);
diskSegments.add(new Segment<K, V>(job, fs, file.getPath(), codec, keepInputs,
(file.toString().endsWith( (file.toString().endsWith(
Task.MERGED_OUTPUT_PREFIX) ? Task.MERGED_OUTPUT_PREFIX) ?
null : mergedMapOutputsCounter) null : mergedMapOutputsCounter), file.getRawDataLength()
)); ));
} }
LOG.info("Merging " + onDisk.length + " files, " + LOG.info("Merging " + onDisk.length + " files, " +
@ -786,7 +797,7 @@ public int compare(Segment<K, V> o1, Segment<K, V> o2) {
return diskMerge; return diskMerge;
} }
finalSegments.add(new Segment<K,V>( finalSegments.add(new Segment<K,V>(
new RawKVIteratorReader(diskMerge, onDiskBytes), true)); new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes));
} }
return Merger.merge(job, fs, keyClass, valueClass, return Merger.merge(job, fs, keyClass, valueClass,
finalSegments, finalSegments.size(), tmpDir, finalSegments, finalSegments.size(), tmpDir,
@ -794,4 +805,24 @@ public int compare(Segment<K, V> o1, Segment<K, V> o2) {
null); null);
} }
static class CompressAwarePath
{
private long rawDataLength;
private Path path;
public CompressAwarePath(Path path, long rawDataLength) {
this.path = path;
this.rawDataLength = rawDataLength;
}
public long getRawDataLength() {
return rawDataLength;
}
public Path getPath() {
return path;
}
}
} }

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
@ -112,7 +113,9 @@ public void shuffle(MapHost host, InputStream input,
@Override @Override
public void commit() throws IOException { public void commit() throws IOException {
localFS.rename(tmpOutputPath, outputPath); localFS.rename(tmpOutputPath, outputPath);
merger.closeOnDiskFile(outputPath); CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath,
getSize());
merger.closeOnDiskFile(compressAwarePath);
} }
@Override @Override

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doAnswer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.IFile.Reader;
import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestMerger {
@Test
public void testCompressed() throws IOException {
testMergeShouldReturnProperProgress(getCompressedSegments());
}
@Test
public void testUncompressed() throws IOException {
testMergeShouldReturnProperProgress(getUncompressedSegments());
}
@SuppressWarnings( { "deprecation", "unchecked" })
public void testMergeShouldReturnProperProgress(
List<Segment<Text, Text>> segments) throws IOException {
Configuration conf = new Configuration();
JobConf jobConf = new JobConf();
FileSystem fs = FileSystem.getLocal(conf);
Path tmpDir = new Path("localpath");
Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
Counter readsCounter = new Counter();
Counter writesCounter = new Counter();
Progress mergePhase = new Progress();
RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
valueClass, segments, 2, tmpDir, comparator, getReporter(),
readsCounter, writesCounter, mergePhase);
Assert.assertEquals(1.0f, mergeQueue.getProgress().get());
}
private Progressable getReporter() {
Progressable reporter = new Progressable() {
@Override
public void progress() {
}
};
return reporter;
}
private List<Segment<Text, Text>> getUncompressedSegments() throws IOException {
List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
for (int i = 1; i < 1; i++) {
segments.add(getUncompressedSegment(i));
System.out.println("adding segment");
}
return segments;
}
private List<Segment<Text, Text>> getCompressedSegments() throws IOException {
List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
for (int i = 1; i < 1; i++) {
segments.add(getCompressedSegment(i));
System.out.println("adding segment");
}
return segments;
}
private Segment<Text, Text> getUncompressedSegment(int i) throws IOException {
return new Segment<Text, Text>(getReader(i), false);
}
private Segment<Text, Text> getCompressedSegment(int i) throws IOException {
return new Segment<Text, Text>(getReader(i), false, 3000l);
}
@SuppressWarnings("unchecked")
private Reader<Text, Text> getReader(int i) throws IOException {
Reader<Text, Text> readerMock = mock(Reader.class);
when(readerMock.getPosition()).thenReturn(0l).thenReturn(10l).thenReturn(
20l);
when(
readerMock.nextRawKey(any(DataInputBuffer.class)))
.thenAnswer(getKeyAnswer("Segment" + i));
doAnswer(getValueAnswer("Segment" + i)).when(readerMock).nextRawValue(
any(DataInputBuffer.class));
return readerMock;
}
private Answer<?> getKeyAnswer(final String segmentName) {
return new Answer<Object>() {
int i = 0;
public Boolean answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
DataInputBuffer key = (DataInputBuffer) args[0];
if (i++ == 2) {
return false;
}
key.reset(("Segement Key " + segmentName + i).getBytes(), 20);
return true;
}
};
}
private Answer<?> getValueAnswer(final String segmentName) {
return new Answer<Void>() {
int i = 0;
public Void answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
DataInputBuffer key = (DataInputBuffer) args[0];
if (i++ == 2) {
return null;
}
key.reset(("Segement Value " + segmentName + i).getBytes(), 20);
return null;
}
};
}
}