diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1f5a89d7c13..593eb43dadd 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -269,9 +269,6 @@ Release 2.0.3-alpha - Unreleased MAPREDUCE-4948. Fix a failing unit test TestYARNRunner.testHistoryServerToken. (Junping Du via sseth) - MAPREDUCE-2264. Job status exceeds 100% in some cases. - (devaraj.k and sandyr via tucu) - Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java index d0074707650..484bd89cd4b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java @@ -218,7 +218,6 @@ public class Merger { CompressionCodec codec = null; long segmentOffset = 0; long segmentLength = -1; - long rawDataLength = -1; Counters.Counter mapOutputsCounter = null; @@ -235,15 +234,6 @@ public class Merger { this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve, mergedMapOutputsCounter); } - - public Segment(Configuration conf, FileSystem fs, Path file, - CompressionCodec codec, boolean preserve, - Counters.Counter mergedMapOutputsCounter, long rawDataLength) - throws IOException { - this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve, - mergedMapOutputsCounter); - this.rawDataLength = rawDataLength; - } public Segment(Configuration conf, FileSystem fs, Path file, long segmentOffset, long segmentLength, @@ -271,11 +261,6 @@ public class Merger { public Segment(Reader reader, boolean preserve) { this(reader, preserve, null); } - - public Segment(Reader reader, boolean preserve, long rawDataLength) { - this(reader, preserve, null); - this.rawDataLength = rawDataLength; - } public Segment(Reader reader, boolean preserve, Counters.Counter mapOutputsCounter) { @@ -315,10 +300,6 @@ public class Merger { segmentLength : reader.getLength(); } - public long getRawDataLength() { - return (rawDataLength > 0) ? rawDataLength : getLength(); - } - boolean nextRawKey() throws IOException { return reader.nextRawKey(key); } @@ -652,7 +633,7 @@ public class Merger { totalBytesProcessed = 0; totalBytes = 0; for (int i = 0; i < segmentsToMerge.size(); i++) { - totalBytes += segmentsToMerge.get(i).getRawDataLength(); + totalBytes += segmentsToMerge.get(i).getLength(); } } if (totalBytes != 0) //being paranoid @@ -721,7 +702,7 @@ public class Merger { // size will match(almost) if combiner is not called in merge. long inputBytesOfThisMerge = totalBytesProcessed - bytesProcessedInPrevMerges; - totalBytes -= inputBytesOfThisMerge - tempSegment.getRawDataLength(); + totalBytes -= inputBytesOfThisMerge - tempSegment.getLength(); if (totalBytes != 0) { progPerByte = 1.0f / (float)totalBytes; } @@ -787,7 +768,7 @@ public class Merger { for (int i = 0; i < numSegments; i++) { // Not handling empty segments here assuming that it would not affect // much in calculation of mergeProgress. - segmentSizes.add(segments.get(i).getRawDataLength()); + segmentSizes.add(segments.get(i).getLength()); } // If includeFinalMerge is true, allow the following while loop iterate diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java index fb2fb61cf7d..007897f17f0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java @@ -89,7 +89,7 @@ public class MergeManagerImpl implements MergeManager { new TreeSet>(new MapOutputComparator()); private final MergeThread, K,V> inMemoryMerger; - Set onDiskMapOutputs = new TreeSet(); + Set onDiskMapOutputs = new TreeSet(); private final OnDiskMerger onDiskMerger; private final long memoryLimit; @@ -336,7 +336,7 @@ public class MergeManagerImpl implements MergeManager { inMemoryMergedMapOutputs.size()); } - public synchronized void closeOnDiskFile(CompressAwarePath file) { + public synchronized void closeOnDiskFile(Path file) { onDiskMapOutputs.add(file); if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) { @@ -356,7 +356,7 @@ public class MergeManagerImpl implements MergeManager { List> memory = new ArrayList>(inMemoryMergedMapOutputs); memory.addAll(inMemoryMapOutputs); - List disk = new ArrayList(onDiskMapOutputs); + List disk = new ArrayList(onDiskMapOutputs); return finalMerge(jobConf, rfs, memory, disk); } @@ -456,7 +456,6 @@ public class MergeManagerImpl implements MergeManager { codec, null); RawKeyValueIterator rIter = null; - CompressAwarePath compressAwarePath; try { LOG.info("Initiating in-memory merge with " + noInMemorySegments + " segments..."); @@ -475,8 +474,6 @@ public class MergeManagerImpl implements MergeManager { combineCollector.setWriter(writer); combineAndSpill(rIter, reduceCombineInputCounter); } - compressAwarePath = new CompressAwarePath(outputPath, - writer.getRawLength()); writer.close(); LOG.info(reduceId + @@ -492,12 +489,12 @@ public class MergeManagerImpl implements MergeManager { } // Note the output of the merge - closeOnDiskFile(compressAwarePath); + closeOnDiskFile(outputPath); } } - private class OnDiskMerger extends MergeThread { + private class OnDiskMerger extends MergeThread { public OnDiskMerger(MergeManagerImpl manager) { super(manager, Integer.MAX_VALUE, exceptionReporter); @@ -506,7 +503,7 @@ public class MergeManagerImpl implements MergeManager { } @Override - public void merge(List inputs) throws IOException { + public void merge(List inputs) throws IOException { // sanity check if (inputs == null || inputs.isEmpty()) { LOG.info("No ondisk files to merge..."); @@ -521,8 +518,8 @@ public class MergeManagerImpl implements MergeManager { " map outputs on disk. Triggering merge..."); // 1. Prepare the list of files to be merged. - for (CompressAwarePath file : inputs) { - approxOutputSize += localFS.getFileStatus(file.getPath()).getLen(); + for (Path file : inputs) { + approxOutputSize += localFS.getFileStatus(file).getLen(); } // add the checksum length @@ -539,7 +536,6 @@ public class MergeManagerImpl implements MergeManager { (Class) jobConf.getMapOutputValueClass(), codec, null); RawKeyValueIterator iter = null; - CompressAwarePath compressAwarePath; Path tmpDir = new Path(reduceId.toString()); try { iter = Merger.merge(jobConf, rfs, @@ -552,15 +548,13 @@ public class MergeManagerImpl implements MergeManager { mergedMapOutputsCounter, null); Merger.writeFile(iter, writer, reporter, jobConf); - compressAwarePath = new CompressAwarePath(outputPath, - writer.getRawLength()); writer.close(); } catch (IOException e) { localFS.delete(outputPath, true); throw e; } - closeOnDiskFile(compressAwarePath); + closeOnDiskFile(outputPath); LOG.info(reduceId + " Finished merging " + inputs.size() + @@ -659,7 +653,7 @@ public class MergeManagerImpl implements MergeManager { private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs, List> inMemoryMapOutputs, - List onDiskMapOutputs + List onDiskMapOutputs ) throws IOException { LOG.info("finalMerge called with " + inMemoryMapOutputs.size() + " in-memory map-outputs and " + @@ -718,8 +712,7 @@ public class MergeManagerImpl implements MergeManager { try { Merger.writeFile(rIter, writer, reporter, job); // add to list of final disk outputs. - onDiskMapOutputs.add(new CompressAwarePath(outputPath, - writer.getRawLength())); + onDiskMapOutputs.add(outputPath); } catch (IOException e) { if (null != outputPath) { try { @@ -749,19 +742,15 @@ public class MergeManagerImpl implements MergeManager { // segments on disk List> diskSegments = new ArrayList>(); long onDiskBytes = inMemToDiskBytes; - long rawBytes = inMemToDiskBytes; - CompressAwarePath[] onDisk = onDiskMapOutputs.toArray( - new CompressAwarePath[onDiskMapOutputs.size()]); - for (CompressAwarePath file : onDisk) { - long fileLength = fs.getFileStatus(file.getPath()).getLen(); - onDiskBytes += fileLength; - rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : fileLength; - - LOG.debug("Disk file: " + file + " Length is " + fileLength); - diskSegments.add(new Segment(job, fs, file.getPath(), codec, keepInputs, + Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]); + for (Path file : onDisk) { + onDiskBytes += fs.getFileStatus(file).getLen(); + LOG.debug("Disk file: " + file + " Length is " + + fs.getFileStatus(file).getLen()); + diskSegments.add(new Segment(job, fs, file, codec, keepInputs, (file.toString().endsWith( Task.MERGED_OUTPUT_PREFIX) ? - null : mergedMapOutputsCounter), file.getRawDataLength() + null : mergedMapOutputsCounter) )); } LOG.info("Merging " + onDisk.length + " files, " + @@ -797,7 +786,7 @@ public class MergeManagerImpl implements MergeManager { return diskMerge; } finalSegments.add(new Segment( - new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes)); + new RawKVIteratorReader(diskMerge, onDiskBytes), true)); } return Merger.merge(job, fs, keyClass, valueClass, finalSegments, finalSegments.size(), tmpDir, @@ -805,24 +794,4 @@ public class MergeManagerImpl implements MergeManager { null); } - - static class CompressAwarePath - { - private long rawDataLength; - - private Path path; - - public CompressAwarePath(Path path, long rawDataLength) { - this.path = path; - this.rawDataLength = rawDataLength; - } - - public long getRawDataLength() { - return rawDataLength; - } - - public Path getPath() { - return path; - } - } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java index bf69798c124..2cb86449e5d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java @@ -37,7 +37,6 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath; @InterfaceAudience.Private @InterfaceStability.Unstable @@ -113,9 +112,7 @@ class OnDiskMapOutput extends MapOutput { @Override public void commit() throws IOException { localFS.rename(tmpOutputPath, outputPath); - CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath, - getSize()); - merger.closeOnDiskFile(compressAwarePath); + merger.closeOnDiskFile(outputPath); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMerger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMerger.java deleted file mode 100644 index 9d9eef6d0b6..00000000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMerger.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.mapred; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.doAnswer; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import junit.framework.Assert; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.Counters.Counter; -import org.apache.hadoop.mapred.IFile.Reader; -import org.apache.hadoop.mapred.Merger.Segment; -import org.apache.hadoop.util.Progress; -import org.apache.hadoop.util.Progressable; -import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -public class TestMerger { - - @Test - public void testCompressed() throws IOException { - testMergeShouldReturnProperProgress(getCompressedSegments()); - } - - @Test - public void testUncompressed() throws IOException { - testMergeShouldReturnProperProgress(getUncompressedSegments()); - } - - @SuppressWarnings( { "deprecation", "unchecked" }) - public void testMergeShouldReturnProperProgress( - List> segments) throws IOException { - Configuration conf = new Configuration(); - JobConf jobConf = new JobConf(); - FileSystem fs = FileSystem.getLocal(conf); - Path tmpDir = new Path("localpath"); - Class keyClass = (Class) jobConf.getMapOutputKeyClass(); - Class valueClass = (Class) jobConf.getMapOutputValueClass(); - RawComparator comparator = jobConf.getOutputKeyComparator(); - Counter readsCounter = new Counter(); - Counter writesCounter = new Counter(); - Progress mergePhase = new Progress(); - RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass, - valueClass, segments, 2, tmpDir, comparator, getReporter(), - readsCounter, writesCounter, mergePhase); - Assert.assertEquals(1.0f, mergeQueue.getProgress().get()); - } - - private Progressable getReporter() { - Progressable reporter = new Progressable() { - @Override - public void progress() { - } - }; - return reporter; - } - - private List> getUncompressedSegments() throws IOException { - List> segments = new ArrayList>(); - for (int i = 1; i < 1; i++) { - segments.add(getUncompressedSegment(i)); - System.out.println("adding segment"); - } - return segments; - } - - private List> getCompressedSegments() throws IOException { - List> segments = new ArrayList>(); - for (int i = 1; i < 1; i++) { - segments.add(getCompressedSegment(i)); - System.out.println("adding segment"); - } - return segments; - } - - private Segment getUncompressedSegment(int i) throws IOException { - return new Segment(getReader(i), false); - } - - private Segment getCompressedSegment(int i) throws IOException { - return new Segment(getReader(i), false, 3000l); - } - - @SuppressWarnings("unchecked") - private Reader getReader(int i) throws IOException { - Reader readerMock = mock(Reader.class); - when(readerMock.getPosition()).thenReturn(0l).thenReturn(10l).thenReturn( - 20l); - when( - readerMock.nextRawKey(any(DataInputBuffer.class))) - .thenAnswer(getKeyAnswer("Segment" + i)); - doAnswer(getValueAnswer("Segment" + i)).when(readerMock).nextRawValue( - any(DataInputBuffer.class)); - - return readerMock; - } - - private Answer getKeyAnswer(final String segmentName) { - return new Answer() { - int i = 0; - - public Boolean answer(InvocationOnMock invocation) { - Object[] args = invocation.getArguments(); - DataInputBuffer key = (DataInputBuffer) args[0]; - if (i++ == 2) { - return false; - } - key.reset(("Segement Key " + segmentName + i).getBytes(), 20); - return true; - } - }; - } - - private Answer getValueAnswer(final String segmentName) { - return new Answer() { - int i = 0; - - public Void answer(InvocationOnMock invocation) { - Object[] args = invocation.getArguments(); - DataInputBuffer key = (DataInputBuffer) args[0]; - if (i++ == 2) { - return null; - } - key.reset(("Segement Value " + segmentName + i).getBytes(), 20); - return null; - } - }; - } -} \ No newline at end of file