MAPREDUCE-2264. Job status exceeds 100% in some cases. (devaraj.k and sandyr via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1440076 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
33cee25aad
commit
0f430e53fd
|
@ -275,6 +275,9 @@ Release 2.0.3-alpha - Unreleased
|
|||
MAPREDUCE-4803. Remove duplicate copy of TestIndexCache. (Mariappan Asokan
|
||||
via sseth)
|
||||
|
||||
MAPREDUCE-2264. Job status exceeds 100% in some cases.
|
||||
(devaraj.k and sandyr via tucu)
|
||||
|
||||
Release 2.0.2-alpha - 2012-09-07
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -218,6 +218,7 @@ public class Merger {
|
|||
CompressionCodec codec = null;
|
||||
long segmentOffset = 0;
|
||||
long segmentLength = -1;
|
||||
long rawDataLength = -1;
|
||||
|
||||
Counters.Counter mapOutputsCounter = null;
|
||||
|
||||
|
@ -234,6 +235,15 @@ public class Merger {
|
|||
this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve,
|
||||
mergedMapOutputsCounter);
|
||||
}
|
||||
|
||||
public Segment(Configuration conf, FileSystem fs, Path file,
|
||||
CompressionCodec codec, boolean preserve,
|
||||
Counters.Counter mergedMapOutputsCounter, long rawDataLength)
|
||||
throws IOException {
|
||||
this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve,
|
||||
mergedMapOutputsCounter);
|
||||
this.rawDataLength = rawDataLength;
|
||||
}
|
||||
|
||||
public Segment(Configuration conf, FileSystem fs, Path file,
|
||||
long segmentOffset, long segmentLength,
|
||||
|
@ -261,6 +271,11 @@ public class Merger {
|
|||
public Segment(Reader<K, V> reader, boolean preserve) {
|
||||
this(reader, preserve, null);
|
||||
}
|
||||
|
||||
public Segment(Reader<K, V> reader, boolean preserve, long rawDataLength) {
|
||||
this(reader, preserve, null);
|
||||
this.rawDataLength = rawDataLength;
|
||||
}
|
||||
|
||||
public Segment(Reader<K, V> reader, boolean preserve,
|
||||
Counters.Counter mapOutputsCounter) {
|
||||
|
@ -300,6 +315,10 @@ public class Merger {
|
|||
segmentLength : reader.getLength();
|
||||
}
|
||||
|
||||
public long getRawDataLength() {
|
||||
return (rawDataLength > 0) ? rawDataLength : getLength();
|
||||
}
|
||||
|
||||
boolean nextRawKey() throws IOException {
|
||||
return reader.nextRawKey(key);
|
||||
}
|
||||
|
@ -633,7 +652,7 @@ public class Merger {
|
|||
totalBytesProcessed = 0;
|
||||
totalBytes = 0;
|
||||
for (int i = 0; i < segmentsToMerge.size(); i++) {
|
||||
totalBytes += segmentsToMerge.get(i).getLength();
|
||||
totalBytes += segmentsToMerge.get(i).getRawDataLength();
|
||||
}
|
||||
}
|
||||
if (totalBytes != 0) //being paranoid
|
||||
|
@ -702,7 +721,7 @@ public class Merger {
|
|||
// size will match(almost) if combiner is not called in merge.
|
||||
long inputBytesOfThisMerge = totalBytesProcessed -
|
||||
bytesProcessedInPrevMerges;
|
||||
totalBytes -= inputBytesOfThisMerge - tempSegment.getLength();
|
||||
totalBytes -= inputBytesOfThisMerge - tempSegment.getRawDataLength();
|
||||
if (totalBytes != 0) {
|
||||
progPerByte = 1.0f / (float)totalBytes;
|
||||
}
|
||||
|
@ -768,7 +787,7 @@ public class Merger {
|
|||
for (int i = 0; i < numSegments; i++) {
|
||||
// Not handling empty segments here assuming that it would not affect
|
||||
// much in calculation of mergeProgress.
|
||||
segmentSizes.add(segments.get(i).getLength());
|
||||
segmentSizes.add(segments.get(i).getRawDataLength());
|
||||
}
|
||||
|
||||
// If includeFinalMerge is true, allow the following while loop iterate
|
||||
|
|
|
@ -89,7 +89,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
|||
new TreeSet<InMemoryMapOutput<K,V>>(new MapOutputComparator<K, V>());
|
||||
private final MergeThread<InMemoryMapOutput<K,V>, K,V> inMemoryMerger;
|
||||
|
||||
Set<Path> onDiskMapOutputs = new TreeSet<Path>();
|
||||
Set<CompressAwarePath> onDiskMapOutputs = new TreeSet<CompressAwarePath>();
|
||||
private final OnDiskMerger onDiskMerger;
|
||||
|
||||
private final long memoryLimit;
|
||||
|
@ -336,7 +336,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
|||
inMemoryMergedMapOutputs.size());
|
||||
}
|
||||
|
||||
public synchronized void closeOnDiskFile(Path file) {
|
||||
public synchronized void closeOnDiskFile(CompressAwarePath file) {
|
||||
onDiskMapOutputs.add(file);
|
||||
|
||||
if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
|
||||
|
@ -356,7 +356,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
|||
List<InMemoryMapOutput<K, V>> memory =
|
||||
new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
|
||||
memory.addAll(inMemoryMapOutputs);
|
||||
List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
|
||||
List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
|
||||
return finalMerge(jobConf, rfs, memory, disk);
|
||||
}
|
||||
|
||||
|
@ -456,6 +456,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
|||
codec, null);
|
||||
|
||||
RawKeyValueIterator rIter = null;
|
||||
CompressAwarePath compressAwarePath;
|
||||
try {
|
||||
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
|
||||
" segments...");
|
||||
|
@ -474,6 +475,8 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
|||
combineCollector.setWriter(writer);
|
||||
combineAndSpill(rIter, reduceCombineInputCounter);
|
||||
}
|
||||
compressAwarePath = new CompressAwarePath(outputPath,
|
||||
writer.getRawLength());
|
||||
writer.close();
|
||||
|
||||
LOG.info(reduceId +
|
||||
|
@ -489,12 +492,12 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
|||
}
|
||||
|
||||
// Note the output of the merge
|
||||
closeOnDiskFile(outputPath);
|
||||
closeOnDiskFile(compressAwarePath);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private class OnDiskMerger extends MergeThread<Path,K,V> {
|
||||
private class OnDiskMerger extends MergeThread<CompressAwarePath,K,V> {
|
||||
|
||||
public OnDiskMerger(MergeManagerImpl<K, V> manager) {
|
||||
super(manager, Integer.MAX_VALUE, exceptionReporter);
|
||||
|
@ -503,7 +506,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void merge(List<Path> inputs) throws IOException {
|
||||
public void merge(List<CompressAwarePath> inputs) throws IOException {
|
||||
// sanity check
|
||||
if (inputs == null || inputs.isEmpty()) {
|
||||
LOG.info("No ondisk files to merge...");
|
||||
|
@ -518,7 +521,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
|||
" map outputs on disk. Triggering merge...");
|
||||
|
||||
// 1. Prepare the list of files to be merged.
|
||||
for (Path file : inputs) {
|
||||
for (CompressAwarePath file : inputs) {
|
||||
approxOutputSize += localFS.getFileStatus(file).getLen();
|
||||
}
|
||||
|
||||
|
@ -536,6 +539,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
|||
(Class<V>) jobConf.getMapOutputValueClass(),
|
||||
codec, null);
|
||||
RawKeyValueIterator iter = null;
|
||||
CompressAwarePath compressAwarePath;
|
||||
Path tmpDir = new Path(reduceId.toString());
|
||||
try {
|
||||
iter = Merger.merge(jobConf, rfs,
|
||||
|
@ -548,13 +552,15 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
|||
mergedMapOutputsCounter, null);
|
||||
|
||||
Merger.writeFile(iter, writer, reporter, jobConf);
|
||||
compressAwarePath = new CompressAwarePath(outputPath,
|
||||
writer.getRawLength());
|
||||
writer.close();
|
||||
} catch (IOException e) {
|
||||
localFS.delete(outputPath, true);
|
||||
throw e;
|
||||
}
|
||||
|
||||
closeOnDiskFile(outputPath);
|
||||
closeOnDiskFile(compressAwarePath);
|
||||
|
||||
LOG.info(reduceId +
|
||||
" Finished merging " + inputs.size() +
|
||||
|
@ -653,7 +659,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
|||
|
||||
private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
|
||||
List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
|
||||
List<Path> onDiskMapOutputs
|
||||
List<CompressAwarePath> onDiskMapOutputs
|
||||
) throws IOException {
|
||||
LOG.info("finalMerge called with " +
|
||||
inMemoryMapOutputs.size() + " in-memory map-outputs and " +
|
||||
|
@ -712,7 +718,8 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
|||
try {
|
||||
Merger.writeFile(rIter, writer, reporter, job);
|
||||
// add to list of final disk outputs.
|
||||
onDiskMapOutputs.add(outputPath);
|
||||
onDiskMapOutputs.add(new CompressAwarePath(outputPath,
|
||||
writer.getRawLength()));
|
||||
} catch (IOException e) {
|
||||
if (null != outputPath) {
|
||||
try {
|
||||
|
@ -742,15 +749,19 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
|||
// segments on disk
|
||||
List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
|
||||
long onDiskBytes = inMemToDiskBytes;
|
||||
Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]);
|
||||
for (Path file : onDisk) {
|
||||
onDiskBytes += fs.getFileStatus(file).getLen();
|
||||
LOG.debug("Disk file: " + file + " Length is " +
|
||||
fs.getFileStatus(file).getLen());
|
||||
long rawBytes = inMemToDiskBytes;
|
||||
CompressAwarePath[] onDisk = onDiskMapOutputs.toArray(
|
||||
new CompressAwarePath[onDiskMapOutputs.size()]);
|
||||
for (CompressAwarePath file : onDisk) {
|
||||
long fileLength = fs.getFileStatus(file).getLen();
|
||||
onDiskBytes += fileLength;
|
||||
rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : fileLength;
|
||||
|
||||
LOG.debug("Disk file: " + file + " Length is " + fileLength);
|
||||
diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,
|
||||
(file.toString().endsWith(
|
||||
Task.MERGED_OUTPUT_PREFIX) ?
|
||||
null : mergedMapOutputsCounter)
|
||||
null : mergedMapOutputsCounter), file.getRawDataLength()
|
||||
));
|
||||
}
|
||||
LOG.info("Merging " + onDisk.length + " files, " +
|
||||
|
@ -786,7 +797,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
|||
return diskMerge;
|
||||
}
|
||||
finalSegments.add(new Segment<K,V>(
|
||||
new RawKVIteratorReader(diskMerge, onDiskBytes), true));
|
||||
new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes));
|
||||
}
|
||||
return Merger.merge(job, fs, keyClass, valueClass,
|
||||
finalSegments, finalSegments.size(), tmpDir,
|
||||
|
@ -794,4 +805,27 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
|||
null);
|
||||
|
||||
}
|
||||
|
||||
static class CompressAwarePath extends Path {
|
||||
private long rawDataLength;
|
||||
|
||||
public CompressAwarePath(Path path, long rawDataLength) {
|
||||
super(path.toUri());
|
||||
this.rawDataLength = rawDataLength;
|
||||
}
|
||||
|
||||
public long getRawDataLength() {
|
||||
return rawDataLength;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
return super.equals(other);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return super.hashCode();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.Reporter;
|
|||
import org.apache.hadoop.mapred.MapOutputFile;
|
||||
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
|
@ -112,7 +113,9 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
|
|||
@Override
|
||||
public void commit() throws IOException {
|
||||
localFS.rename(tmpOutputPath, outputPath);
|
||||
merger.closeOnDiskFile(outputPath);
|
||||
CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath,
|
||||
getSize());
|
||||
merger.closeOnDiskFile(compressAwarePath);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,272 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.mapreduce.task.reduce;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.RawComparator;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.Counters.Counter;
|
||||
import org.apache.hadoop.mapred.IFile.Reader;
|
||||
import org.apache.hadoop.mapred.IFile;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.MROutputFiles;
|
||||
import org.apache.hadoop.mapred.Merger;
|
||||
import org.apache.hadoop.mapred.Merger.Segment;
|
||||
import org.apache.hadoop.mapred.RawKeyValueIterator;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.hadoop.mapreduce.JobID;
|
||||
import org.apache.hadoop.mapreduce.MRConfig;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.mapreduce.TaskID;
|
||||
import org.apache.hadoop.mapreduce.TaskType;
|
||||
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl;
|
||||
import org.apache.hadoop.util.Progress;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
public class TestMerger {
|
||||
|
||||
private Configuration conf;
|
||||
private JobConf jobConf;
|
||||
private FileSystem fs;
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
conf = new Configuration();
|
||||
jobConf = new JobConf();
|
||||
fs = FileSystem.getLocal(conf);
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() throws IOException {
|
||||
fs.delete(new Path(jobConf.getLocalDirs()[0]), true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInMemoryMerger() throws IOException {
|
||||
JobID jobId = new JobID("a", 0);
|
||||
TaskAttemptID reduceId = new TaskAttemptID(
|
||||
new TaskID(jobId, TaskType.REDUCE, 0), 0);
|
||||
TaskAttemptID mapId1 = new TaskAttemptID(
|
||||
new TaskID(jobId, TaskType.MAP, 1), 0);
|
||||
TaskAttemptID mapId2 = new TaskAttemptID(
|
||||
new TaskID(jobId, TaskType.MAP, 2), 0);
|
||||
|
||||
LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
|
||||
|
||||
MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>(
|
||||
reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
|
||||
null, null, new Progress(), new MROutputFiles());
|
||||
|
||||
// write map outputs
|
||||
Map<String, String> map1 = new TreeMap<String, String>();
|
||||
map1.put("apple", "disgusting");
|
||||
map1.put("carrot", "delicious");
|
||||
Map<String, String> map2 = new TreeMap<String, String>();
|
||||
map1.put("banana", "pretty good");
|
||||
byte[] mapOutputBytes1 = writeMapOutput(conf, map1);
|
||||
byte[] mapOutputBytes2 = writeMapOutput(conf, map2);
|
||||
InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>(
|
||||
conf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
|
||||
InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>(
|
||||
conf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
|
||||
System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
|
||||
mapOutputBytes1.length);
|
||||
System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
|
||||
mapOutputBytes2.length);
|
||||
|
||||
// create merger and run merge
|
||||
MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger =
|
||||
mergeManager.createInMemoryMerger();
|
||||
List<InMemoryMapOutput<Text, Text>> mapOutputs =
|
||||
new ArrayList<InMemoryMapOutput<Text, Text>>();
|
||||
mapOutputs.add(mapOutput1);
|
||||
mapOutputs.add(mapOutput2);
|
||||
|
||||
inMemoryMerger.merge(mapOutputs);
|
||||
|
||||
Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
|
||||
Path outPath = mergeManager.onDiskMapOutputs.iterator().next();
|
||||
|
||||
List<String> keys = new ArrayList<String>();
|
||||
List<String> values = new ArrayList<String>();
|
||||
readOnDiskMapOutput(conf, fs, outPath, keys, values);
|
||||
Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
|
||||
Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious"));
|
||||
}
|
||||
|
||||
private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues)
|
||||
throws IOException {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
|
||||
IFile.Writer<Text, Text> writer = new IFile.Writer<Text, Text>(conf, fsdos,
|
||||
Text.class, Text.class, null, null);
|
||||
for (String key : keysToValues.keySet()) {
|
||||
String value = keysToValues.get(key);
|
||||
writer.append(new Text(key), new Text(value));
|
||||
}
|
||||
writer.close();
|
||||
return baos.toByteArray();
|
||||
}
|
||||
|
||||
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
|
||||
List<String> keys, List<String> values) throws IOException {
|
||||
IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, fs,
|
||||
path, null, null);
|
||||
DataInputBuffer keyBuff = new DataInputBuffer();
|
||||
DataInputBuffer valueBuff = new DataInputBuffer();
|
||||
Text key = new Text();
|
||||
Text value = new Text();
|
||||
while (reader.nextRawKey(keyBuff)) {
|
||||
key.readFields(keyBuff);
|
||||
keys.add(key.toString());
|
||||
reader.nextRawValue(valueBuff);
|
||||
value.readFields(valueBuff);
|
||||
values.add(value.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompressed() throws IOException {
|
||||
testMergeShouldReturnProperProgress(getCompressedSegments());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUncompressed() throws IOException {
|
||||
testMergeShouldReturnProperProgress(getUncompressedSegments());
|
||||
}
|
||||
|
||||
@SuppressWarnings( { "deprecation", "unchecked" })
|
||||
public void testMergeShouldReturnProperProgress(
|
||||
List<Segment<Text, Text>> segments) throws IOException {
|
||||
Path tmpDir = new Path("localpath");
|
||||
Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
|
||||
Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
|
||||
RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
|
||||
Counter readsCounter = new Counter();
|
||||
Counter writesCounter = new Counter();
|
||||
Progress mergePhase = new Progress();
|
||||
RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
|
||||
valueClass, segments, 2, tmpDir, comparator, getReporter(),
|
||||
readsCounter, writesCounter, mergePhase);
|
||||
Assert.assertEquals(1.0f, mergeQueue.getProgress().get());
|
||||
}
|
||||
|
||||
private Progressable getReporter() {
|
||||
Progressable reporter = new Progressable() {
|
||||
@Override
|
||||
public void progress() {
|
||||
}
|
||||
};
|
||||
return reporter;
|
||||
}
|
||||
|
||||
private List<Segment<Text, Text>> getUncompressedSegments() throws IOException {
|
||||
List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
|
||||
for (int i = 1; i < 1; i++) {
|
||||
segments.add(getUncompressedSegment(i));
|
||||
}
|
||||
return segments;
|
||||
}
|
||||
|
||||
private List<Segment<Text, Text>> getCompressedSegments() throws IOException {
|
||||
List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
|
||||
for (int i = 1; i < 1; i++) {
|
||||
segments.add(getCompressedSegment(i));
|
||||
}
|
||||
return segments;
|
||||
}
|
||||
|
||||
private Segment<Text, Text> getUncompressedSegment(int i) throws IOException {
|
||||
return new Segment<Text, Text>(getReader(i), false);
|
||||
}
|
||||
|
||||
private Segment<Text, Text> getCompressedSegment(int i) throws IOException {
|
||||
return new Segment<Text, Text>(getReader(i), false, 3000l);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Reader<Text, Text> getReader(int i) throws IOException {
|
||||
Reader<Text, Text> readerMock = mock(Reader.class);
|
||||
when(readerMock.getPosition()).thenReturn(0l).thenReturn(10l).thenReturn(
|
||||
20l);
|
||||
when(
|
||||
readerMock.nextRawKey(any(DataInputBuffer.class)))
|
||||
.thenAnswer(getKeyAnswer("Segment" + i));
|
||||
doAnswer(getValueAnswer("Segment" + i)).when(readerMock).nextRawValue(
|
||||
any(DataInputBuffer.class));
|
||||
|
||||
return readerMock;
|
||||
}
|
||||
|
||||
private Answer<?> getKeyAnswer(final String segmentName) {
|
||||
return new Answer<Object>() {
|
||||
int i = 0;
|
||||
|
||||
public Boolean answer(InvocationOnMock invocation) {
|
||||
Object[] args = invocation.getArguments();
|
||||
DataInputBuffer key = (DataInputBuffer) args[0];
|
||||
if (i++ == 2) {
|
||||
return false;
|
||||
}
|
||||
key.reset(("Segment Key " + segmentName + i).getBytes(), 20);
|
||||
return true;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private Answer<?> getValueAnswer(final String segmentName) {
|
||||
return new Answer<Void>() {
|
||||
int i = 0;
|
||||
|
||||
public Void answer(InvocationOnMock invocation) {
|
||||
Object[] args = invocation.getArguments();
|
||||
DataInputBuffer key = (DataInputBuffer) args[0];
|
||||
if (i++ == 2) {
|
||||
return null;
|
||||
}
|
||||
key.reset(("Segment Value " + segmentName + i).getBytes(), 20);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue