diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index a83d9787d68..c6e75770bdc 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -19,6 +19,8 @@ Trunk (Unreleased)
MAPREDUCE-4887. Add RehashPartitioner, to smooth distributions
with poor implementations of Object#hashCode(). (Radim Kolar via cutting)
+ MAPREDUCE-4808. Refactor MapOutput and MergeManager to facilitate reuse by Shuffle implementations. (masokan via tucu)
+
IMPROVEMENTS
MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for
diff --git a/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml b/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
index 08d4c2e7f68..ecac4244c30 100644
--- a/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
+++ b/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
@@ -268,7 +268,7 @@
This class is unlikely to get subclassed, so ignore
-->
-
+
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index f2cbc6e4ed7..e35cb6cdcf5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.mapreduce.task.reduce;
import java.io.DataInputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
@@ -38,12 +36,7 @@ import javax.net.ssl.HttpsURLConnection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.IFileInputStream;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.MRConfig;
@@ -51,9 +44,6 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -70,7 +60,7 @@ class Fetcher extends Thread {
/* Default read timeout (in milliseconds) */
private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
- private final Progressable reporter;
+ private final Reporter reporter;
private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
CONNECTION, WRONG_REDUCE}
@@ -92,15 +82,10 @@ class Fetcher extends Thread {
private final int connectionTimeout;
private final int readTimeout;
- // Decompression of map-outputs
- private final CompressionCodec codec;
- private final Decompressor decompressor;
private final SecretKey jobTokenSecret;
private volatile boolean stopped = false;
- private JobConf job;
-
private static boolean sslShuffle;
private static SSLFactory sslFactory;
@@ -108,7 +93,6 @@ class Fetcher extends Thread {
ShuffleScheduler scheduler, MergeManager merger,
Reporter reporter, ShuffleClientMetrics metrics,
ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) {
- this.job = job;
this.reporter = reporter;
this.scheduler = scheduler;
this.merger = merger;
@@ -130,16 +114,6 @@ class Fetcher extends Thread {
wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.WRONG_REDUCE.toString());
- if (job.getCompressMapOutput()) {
- Class extends CompressionCodec> codecClass =
- job.getMapOutputCompressorClass(DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, job);
- decompressor = CodecPool.getDecompressor(codec);
- } else {
- codec = null;
- decompressor = null;
- }
-
this.connectionTimeout =
job.getInt(MRJobConfig.SHUFFLE_CONNECT_TIMEOUT,
DEFAULT_STALLED_COPY_TIMEOUT);
@@ -170,7 +144,7 @@ class Fetcher extends Thread {
MapHost host = null;
try {
// If merge is on, block
- merger.waitForInMemoryMerge();
+ merger.waitForResource();
// Get a host to shuffle from
host = scheduler.getHost();
@@ -386,8 +360,8 @@ class Fetcher extends Thread {
mapOutput = merger.reserve(mapId, decompressedLength, id);
// Check if we can shuffle *now* ...
- if (mapOutput.getType() == Type.WAIT) {
- LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
+ if (mapOutput == null) {
+ LOG.info("fetcher#" + id + " - MergeManager returned status WAIT ...");
//Not an error but wait to process data.
return EMPTY_ATTEMPT_ID_ARRAY;
}
@@ -396,13 +370,9 @@ class Fetcher extends Thread {
LOG.info("fetcher#" + id + " about to shuffle output of map " +
mapOutput.getMapId() + " decomp: " +
decompressedLength + " len: " + compressedLength + " to " +
- mapOutput.getType());
- if (mapOutput.getType() == Type.MEMORY) {
- shuffleToMemory(host, mapOutput, input,
- (int) decompressedLength, (int) compressedLength);
- } else {
- shuffleToDisk(host, mapOutput, input, compressedLength);
- }
+ mapOutput.getDescription());
+ mapOutput.shuffle(host, input, compressedLength, decompressedLength,
+ metrics, reporter);
// Inform the shuffle scheduler
long endTime = System.currentTimeMillis();
@@ -538,84 +508,4 @@ class Fetcher extends Thread {
}
}
}
-
- private void shuffleToMemory(MapHost host, MapOutput mapOutput,
- InputStream input,
- int decompressedLength,
- int compressedLength) throws IOException {
- IFileInputStream checksumIn =
- new IFileInputStream(input, compressedLength, job);
-
- input = checksumIn;
-
- // Are map-outputs compressed?
- if (codec != null) {
- decompressor.reset();
- input = codec.createInputStream(input, decompressor);
- }
-
- // Copy map-output into an in-memory buffer
- byte[] shuffleData = mapOutput.getMemory();
-
- try {
- IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
- metrics.inputBytes(shuffleData.length);
- reporter.progress();
- LOG.info("Read " + shuffleData.length + " bytes from map-output for " +
- mapOutput.getMapId());
- } catch (IOException ioe) {
- // Close the streams
- IOUtils.cleanup(LOG, input);
-
- // Re-throw
- throw ioe;
- }
-
- }
-
- private void shuffleToDisk(MapHost host, MapOutput mapOutput,
- InputStream input,
- long compressedLength)
- throws IOException {
- // Copy data to local-disk
- OutputStream output = mapOutput.getDisk();
- long bytesLeft = compressedLength;
- try {
- final int BYTES_TO_READ = 64 * 1024;
- byte[] buf = new byte[BYTES_TO_READ];
- while (bytesLeft > 0) {
- int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
- if (n < 0) {
- throw new IOException("read past end of stream reading " +
- mapOutput.getMapId());
- }
- output.write(buf, 0, n);
- bytesLeft -= n;
- metrics.inputBytes(n);
- reporter.progress();
- }
-
- LOG.info("Read " + (compressedLength - bytesLeft) +
- " bytes from map-output for " +
- mapOutput.getMapId());
-
- output.close();
- } catch (IOException ioe) {
- // Close the streams
- IOUtils.cleanup(LOG, input, output);
-
- // Re-throw
- throw ioe;
- }
-
- // Sanity check
- if (bytesLeft != 0) {
- throw new IOException("Incomplete map output received for " +
- mapOutput.getMapId() + " from " +
- host.getHostName() + " (" +
- bytesLeft + " bytes missing of " +
- compressedLength + ")"
- );
- }
- }
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java
new file mode 100644
index 00000000000..87e9268c31a
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.InputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.IOUtils;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Decompressor;
+
+import org.apache.hadoop.mapred.IFileInputStream;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class InMemoryMapOutput extends MapOutput {
+ private static final Log LOG = LogFactory.getLog(InMemoryMapOutput.class);
+ private Configuration conf;
+ private final MergeManagerImpl merger;
+ private final byte[] memory;
+ private BoundedByteArrayOutputStream byteStream;
+ // Decompression of map-outputs
+ private final CompressionCodec codec;
+ private final Decompressor decompressor;
+
+ public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId,
+ MergeManagerImpl merger,
+ int size, CompressionCodec codec,
+ boolean primaryMapOutput) {
+ super(mapId, (long)size, primaryMapOutput);
+ this.conf = conf;
+ this.merger = merger;
+ this.codec = codec;
+ byteStream = new BoundedByteArrayOutputStream(size);
+ memory = byteStream.getBuffer();
+ if (codec != null) {
+ decompressor = CodecPool.getDecompressor(codec);
+ } else {
+ decompressor = null;
+ }
+ }
+
+ public byte[] getMemory() {
+ return memory;
+ }
+
+ public BoundedByteArrayOutputStream getArrayStream() {
+ return byteStream;
+ }
+
+ @Override
+ public void shuffle(MapHost host, InputStream input,
+ long compressedLength, long decompressedLength,
+ ShuffleClientMetrics metrics,
+ Reporter reporter) throws IOException {
+ IFileInputStream checksumIn =
+ new IFileInputStream(input, compressedLength, conf);
+
+ input = checksumIn;
+
+ // Are map-outputs compressed?
+ if (codec != null) {
+ decompressor.reset();
+ input = codec.createInputStream(input, decompressor);
+ }
+
+ try {
+ IOUtils.readFully(input, memory, 0, memory.length);
+ metrics.inputBytes(memory.length);
+ reporter.progress();
+ LOG.info("Read " + memory.length + " bytes from map-output for " +
+ getMapId());
+ } catch (IOException ioe) {
+ // Close the streams
+ IOUtils.cleanup(LOG, input);
+
+ // Re-throw
+ throw ioe;
+ } finally {
+ CodecPool.returnDecompressor(decompressor);
+ }
+ }
+
+ @Override
+ public void commit() throws IOException {
+ merger.closeInMemoryFile(this);
+ }
+
+ @Override
+ public void abort() {
+ merger.unreserve(memory.length);
+ }
+
+ @Override
+ public String getDescription() {
+ return "MEMORY";
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java
index 380856bced8..543ff3f9cc7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java
@@ -35,12 +35,12 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
@InterfaceStability.Unstable
public class InMemoryReader extends Reader {
private final TaskAttemptID taskAttemptId;
- private final MergeManager merger;
+ private final MergeManagerImpl merger;
DataInputBuffer memDataIn = new DataInputBuffer();
private int start;
private int length;
- public InMemoryReader(MergeManager merger, TaskAttemptID taskAttemptId,
+ public InMemoryReader(MergeManagerImpl merger, TaskAttemptID taskAttemptId,
byte[] data, int start, int length)
throws IOException {
super(null, null, length - start, null, null);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java
index fbe7096abfd..b5a8cf53999 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java
@@ -17,119 +17,36 @@
*/
package org.apache.hadoop.mapreduce.task.reduce;
+import java.io.InputStream;
import java.io.IOException;
-import java.io.OutputStream;
+
import java.util.Comparator;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapOutputFile;
+
+import org.apache.hadoop.mapred.Reporter;
+
import org.apache.hadoop.mapreduce.TaskAttemptID;
@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
-public class MapOutput {
- private static final Log LOG = LogFactory.getLog(MapOutput.class);
+public abstract class MapOutput {
private static AtomicInteger ID = new AtomicInteger(0);
- public static enum Type {
- WAIT,
- MEMORY,
- DISK
- }
-
private final int id;
-
- private final MergeManager merger;
private final TaskAttemptID mapId;
-
private final long size;
-
- private final byte[] memory;
- private BoundedByteArrayOutputStream byteStream;
-
- private final FileSystem localFS;
- private final Path tmpOutputPath;
- private final Path outputPath;
- private final OutputStream disk;
-
- private final Type type;
-
private final boolean primaryMapOutput;
- public MapOutput(TaskAttemptID mapId, MergeManager merger, long size,
- JobConf conf, LocalDirAllocator localDirAllocator,
- int fetcher, boolean primaryMapOutput, MapOutputFile mapOutputFile)
- throws IOException {
+ public MapOutput(TaskAttemptID mapId, long size, boolean primaryMapOutput) {
this.id = ID.incrementAndGet();
this.mapId = mapId;
- this.merger = merger;
-
- type = Type.DISK;
-
- memory = null;
- byteStream = null;
-
this.size = size;
-
- this.localFS = FileSystem.getLocal(conf);
- outputPath =
- mapOutputFile.getInputFileForWrite(mapId.getTaskID(),size);
- tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
-
- disk = localFS.create(tmpOutputPath);
-
this.primaryMapOutput = primaryMapOutput;
}
- public MapOutput(TaskAttemptID mapId, MergeManager merger, int size,
- boolean primaryMapOutput) {
- this.id = ID.incrementAndGet();
- this.mapId = mapId;
- this.merger = merger;
-
- type = Type.MEMORY;
- byteStream = new BoundedByteArrayOutputStream(size);
- memory = byteStream.getBuffer();
-
- this.size = size;
-
- localFS = null;
- disk = null;
- outputPath = null;
- tmpOutputPath = null;
-
- this.primaryMapOutput = primaryMapOutput;
- }
-
- public MapOutput(TaskAttemptID mapId) {
- this.id = ID.incrementAndGet();
- this.mapId = mapId;
-
- type = Type.WAIT;
- merger = null;
- memory = null;
- byteStream = null;
-
- size = -1;
-
- localFS = null;
- disk = null;
- outputPath = null;
- tmpOutputPath = null;
-
- this.primaryMapOutput = false;
-}
-
public boolean isPrimaryMapOutput() {
return primaryMapOutput;
}
@@ -147,62 +64,28 @@ public class MapOutput {
return id;
}
- public Path getOutputPath() {
- return outputPath;
- }
-
- public byte[] getMemory() {
- return memory;
- }
-
- public BoundedByteArrayOutputStream getArrayStream() {
- return byteStream;
- }
-
- public OutputStream getDisk() {
- return disk;
- }
-
public TaskAttemptID getMapId() {
return mapId;
}
- public Type getType() {
- return type;
- }
-
public long getSize() {
return size;
}
- public void commit() throws IOException {
- if (type == Type.MEMORY) {
- merger.closeInMemoryFile(this);
- } else if (type == Type.DISK) {
- localFS.rename(tmpOutputPath, outputPath);
- merger.closeOnDiskFile(outputPath);
- } else {
- throw new IOException("Cannot commit MapOutput of type WAIT!");
- }
- }
-
- public void abort() {
- if (type == Type.MEMORY) {
- merger.unreserve(memory.length);
- } else if (type == Type.DISK) {
- try {
- localFS.delete(tmpOutputPath, false);
- } catch (IOException ie) {
- LOG.info("failure to clean up " + tmpOutputPath, ie);
- }
- } else {
- throw new IllegalArgumentException
- ("Cannot commit MapOutput with of type WAIT!");
- }
- }
+ public abstract void shuffle(MapHost host, InputStream input,
+ long compressedLength,
+ long decompressedLength,
+ ShuffleClientMetrics metrics,
+ Reporter reporter) throws IOException;
+
+ public abstract void commit() throws IOException;
+ public abstract void abort();
+
+ public abstract String getDescription();
+
public String toString() {
- return "MapOutput(" + mapId + ", " + type + ")";
+ return "MapOutput(" + mapId + ", " + getDescription() + ")";
}
public static class MapOutputComparator
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
index c75f14274dc..2ecc55ecbb1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
@@ -15,783 +15,56 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.mapreduce.task.reduce;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.IFile;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
-import org.apache.hadoop.mapred.Merger;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Task;
-import org.apache.hadoop.mapred.IFile.Reader;
-import org.apache.hadoop.mapred.IFile.Writer;
-import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
-import org.apache.hadoop.mapred.Task.CombineValuesIterator;
-import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator;
import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.ReflectionUtils;
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
-@SuppressWarnings(value={"unchecked"})
-@InterfaceAudience.LimitedPrivate({"MapReduce"})
+/**
+ * An interface for a reduce side merge that works with the default Shuffle
+ * implementation.
+ */
+@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class MergeManager {
-
- private static final Log LOG = LogFactory.getLog(MergeManager.class);
-
- /* Maximum percentage of the in-memory limit that a single shuffle can
- * consume*/
- private static final float DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT
- = 0.25f;
-
- private final TaskAttemptID reduceId;
-
- private final JobConf jobConf;
- private final FileSystem localFS;
- private final FileSystem rfs;
- private final LocalDirAllocator localDirAllocator;
-
- protected MapOutputFile mapOutputFile;
-
- Set> inMemoryMergedMapOutputs =
- new TreeSet>(new MapOutputComparator());
- private final IntermediateMemoryToMemoryMerger memToMemMerger;
-
- Set> inMemoryMapOutputs =
- new TreeSet>(new MapOutputComparator());
- private final MergeThread, K,V> inMemoryMerger;
-
- Set onDiskMapOutputs = new TreeSet();
- private final OnDiskMerger onDiskMerger;
-
- private final long memoryLimit;
- private long usedMemory;
- private long commitMemory;
- private final long maxSingleShuffleLimit;
-
- private final int memToMemMergeOutputsThreshold;
- private final long mergeThreshold;
-
- private final int ioSortFactor;
-
- private final Reporter reporter;
- private final ExceptionReporter exceptionReporter;
-
+public interface MergeManager {
/**
- * Combiner class to run during in-memory merge, if defined.
+ * To wait until merge has some freed resources available so that it can
+ * accept shuffled data. This will be called before a network connection is
+ * established to get the map output.
*/
- private final Class extends Reducer> combinerClass;
+ public void waitForResource() throws InterruptedException;
/**
- * Resettable collector used for combine.
+ * To reserve resources for data to be shuffled. This will be called after
+ * a network connection is made to shuffle the data.
+ * @param mapId mapper from which data will be shuffled.
+ * @param requestedSize size in bytes of data that will be shuffled.
+ * @param fetcher id of the map output fetcher that will shuffle the data.
+ * @return a MapOutput object that can be used by shuffle to shuffle data. If
+ * required resources cannot be reserved immediately, a null can be returned.
*/
- private final CombineOutputCollector combineCollector;
+ public MapOutput reserve(TaskAttemptID mapId, long requestedSize,
+ int fetcher) throws IOException;
- private final Counters.Counter spilledRecordsCounter;
-
- private final Counters.Counter reduceCombineInputCounter;
-
- private final Counters.Counter mergedMapOutputsCounter;
-
- private final CompressionCodec codec;
-
- private final Progress mergePhase;
-
- public MergeManager(TaskAttemptID reduceId, JobConf jobConf,
- FileSystem localFS,
- LocalDirAllocator localDirAllocator,
- Reporter reporter,
- CompressionCodec codec,
- Class extends Reducer> combinerClass,
- CombineOutputCollector combineCollector,
- Counters.Counter spilledRecordsCounter,
- Counters.Counter reduceCombineInputCounter,
- Counters.Counter mergedMapOutputsCounter,
- ExceptionReporter exceptionReporter,
- Progress mergePhase, MapOutputFile mapOutputFile) {
- this.reduceId = reduceId;
- this.jobConf = jobConf;
- this.localDirAllocator = localDirAllocator;
- this.exceptionReporter = exceptionReporter;
-
- this.reporter = reporter;
- this.codec = codec;
- this.combinerClass = combinerClass;
- this.combineCollector = combineCollector;
- this.reduceCombineInputCounter = reduceCombineInputCounter;
- this.spilledRecordsCounter = spilledRecordsCounter;
- this.mergedMapOutputsCounter = mergedMapOutputsCounter;
- this.mapOutputFile = mapOutputFile;
- this.mapOutputFile.setConf(jobConf);
-
- this.localFS = localFS;
- this.rfs = ((LocalFileSystem)localFS).getRaw();
-
- final float maxInMemCopyUse =
- jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 0.90f);
- if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
- throw new IllegalArgumentException("Invalid value for " +
- MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": " +
- maxInMemCopyUse);
- }
-
- // Allow unit tests to fix Runtime memory
- this.memoryLimit =
- (long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
- Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
- * maxInMemCopyUse);
-
- this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
-
- final float singleShuffleMemoryLimitPercent =
- jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT,
- DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT);
- if (singleShuffleMemoryLimitPercent <= 0.0f
- || singleShuffleMemoryLimitPercent > 1.0f) {
- throw new IllegalArgumentException("Invalid value for "
- + MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
- + singleShuffleMemoryLimitPercent);
- }
-
- usedMemory = 0L;
- commitMemory = 0L;
- this.maxSingleShuffleLimit =
- (long)(memoryLimit * singleShuffleMemoryLimitPercent);
- this.memToMemMergeOutputsThreshold =
- jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
- this.mergeThreshold = (long)(this.memoryLimit *
- jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT,
- 0.90f));
- LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
- "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
- "mergeThreshold=" + mergeThreshold + ", " +
- "ioSortFactor=" + ioSortFactor + ", " +
- "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
-
- if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
- throw new RuntimeException("Invlaid configuration: "
- + "maxSingleShuffleLimit should be less than mergeThreshold"
- + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
- + "mergeThreshold: " + this.mergeThreshold);
- }
-
- boolean allowMemToMemMerge =
- jobConf.getBoolean(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, false);
- if (allowMemToMemMerge) {
- this.memToMemMerger =
- new IntermediateMemoryToMemoryMerger(this,
- memToMemMergeOutputsThreshold);
- this.memToMemMerger.start();
- } else {
- this.memToMemMerger = null;
- }
-
- this.inMemoryMerger = createInMemoryMerger();
- this.inMemoryMerger.start();
-
- this.onDiskMerger = new OnDiskMerger(this);
- this.onDiskMerger.start();
-
- this.mergePhase = mergePhase;
- }
-
- protected MergeThread, K,V> createInMemoryMerger() {
- return new InMemoryMerger(this);
- }
-
- TaskAttemptID getReduceId() {
- return reduceId;
- }
-
- @VisibleForTesting
- ExceptionReporter getExceptionReporter() {
- return exceptionReporter;
- }
-
- public void waitForInMemoryMerge() throws InterruptedException {
- inMemoryMerger.waitForMerge();
- }
-
- private boolean canShuffleToMemory(long requestedSize) {
- return (requestedSize < maxSingleShuffleLimit);
- }
-
- final private MapOutput stallShuffle = new MapOutput(null);
-
- public synchronized MapOutput reserve(TaskAttemptID mapId,
- long requestedSize,
- int fetcher
- ) throws IOException {
- if (!canShuffleToMemory(requestedSize)) {
- LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
- " is greater than maxSingleShuffleLimit (" +
- maxSingleShuffleLimit + ")");
- return new MapOutput(mapId, this, requestedSize, jobConf,
- localDirAllocator, fetcher, true,
- mapOutputFile);
- }
-
- // Stall shuffle if we are above the memory limit
-
- // It is possible that all threads could just be stalling and not make
- // progress at all. This could happen when:
- //
- // requested size is causing the used memory to go above limit &&
- // requested size < singleShuffleLimit &&
- // current used size < mergeThreshold (merge will not get triggered)
- //
- // To avoid this from happening, we allow exactly one thread to go past
- // the memory limit. We check (usedMemory > memoryLimit) and not
- // (usedMemory + requestedSize > memoryLimit). When this thread is done
- // fetching, this will automatically trigger a merge thereby unlocking
- // all the stalled threads
-
- if (usedMemory > memoryLimit) {
- LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
- + ") is greater than memoryLimit (" + memoryLimit + ")." +
- " CommitMemory is (" + commitMemory + ")");
- return stallShuffle;
- }
-
- // Allow the in-memory shuffle to progress
- LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
- + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
- + "CommitMemory is (" + commitMemory + ")");
- return unconditionalReserve(mapId, requestedSize, true);
- }
-
/**
- * Unconditional Reserve is used by the Memory-to-Memory thread
- * @return
+ * Called at the end of shuffle.
+ * @return a key value iterator object.
*/
- private synchronized MapOutput unconditionalReserve(
- TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
- usedMemory += requestedSize;
- return new MapOutput(mapId, this, (int)requestedSize,
- primaryMapOutput);
- }
-
- synchronized void unreserve(long size) {
- usedMemory -= size;
- }
-
- public synchronized void closeInMemoryFile(MapOutput mapOutput) {
- inMemoryMapOutputs.add(mapOutput);
- LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
- + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
- + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);
-
- commitMemory+= mapOutput.getSize();
-
- // Can hang if mergeThreshold is really low.
- if (commitMemory >= mergeThreshold) {
- LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
- commitMemory + " > mergeThreshold=" + mergeThreshold +
- ". Current usedMemory=" + usedMemory);
- inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
- inMemoryMergedMapOutputs.clear();
- inMemoryMerger.startMerge(inMemoryMapOutputs);
- commitMemory = 0L; // Reset commitMemory.
- }
-
- if (memToMemMerger != null) {
- if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
- memToMemMerger.startMerge(inMemoryMapOutputs);
- }
- }
- }
-
-
- public synchronized void closeInMemoryMergedFile(MapOutput mapOutput) {
- inMemoryMergedMapOutputs.add(mapOutput);
- LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() +
- ", inMemoryMergedMapOutputs.size() -> " +
- inMemoryMergedMapOutputs.size());
- }
-
- public synchronized void closeOnDiskFile(Path file) {
- onDiskMapOutputs.add(file);
-
- if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
- onDiskMerger.startMerge(onDiskMapOutputs);
- }
- }
-
- public RawKeyValueIterator close() throws Throwable {
- // Wait for on-going merges to complete
- if (memToMemMerger != null) {
- memToMemMerger.close();
- }
- inMemoryMerger.close();
- onDiskMerger.close();
-
- List> memory =
- new ArrayList>(inMemoryMergedMapOutputs);
- memory.addAll(inMemoryMapOutputs);
- List disk = new ArrayList(onDiskMapOutputs);
- return finalMerge(jobConf, rfs, memory, disk);
- }
-
- private class IntermediateMemoryToMemoryMerger
- extends MergeThread, K, V> {
-
- public IntermediateMemoryToMemoryMerger(MergeManager manager,
- int mergeFactor) {
- super(manager, mergeFactor, exceptionReporter);
- setName("InMemoryMerger - Thread to do in-memory merge of in-memory " +
- "shuffled map-outputs");
- setDaemon(true);
- }
-
- @Override
- public void merge(List> inputs) throws IOException {
- if (inputs == null || inputs.size() == 0) {
- return;
- }
-
- TaskAttemptID dummyMapId = inputs.get(0).getMapId();
- List> inMemorySegments = new ArrayList>();
- long mergeOutputSize =
- createInMemorySegments(inputs, inMemorySegments, 0);
- int noInMemorySegments = inMemorySegments.size();
-
- MapOutput mergedMapOutputs =
- unconditionalReserve(dummyMapId, mergeOutputSize, false);
-
- Writer writer =
- new InMemoryWriter(mergedMapOutputs.getArrayStream());
-
- LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
- " segments of total-size: " + mergeOutputSize);
-
- RawKeyValueIterator rIter =
- Merger.merge(jobConf, rfs,
- (Class)jobConf.getMapOutputKeyClass(),
- (Class)jobConf.getMapOutputValueClass(),
- inMemorySegments, inMemorySegments.size(),
- new Path(reduceId.toString()),
- (RawComparator)jobConf.getOutputKeyComparator(),
- reporter, null, null, null);
- Merger.writeFile(rIter, writer, reporter, jobConf);
- writer.close();
-
- LOG.info(reduceId +
- " Memory-to-Memory merge of the " + noInMemorySegments +
- " files in-memory complete.");
-
- // Note the output of the merge
- closeInMemoryMergedFile(mergedMapOutputs);
- }
- }
-
- private class InMemoryMerger extends MergeThread, K,V> {
-
- public InMemoryMerger(MergeManager manager) {
- super(manager, Integer.MAX_VALUE, exceptionReporter);
- setName
- ("InMemoryMerger - Thread to merge in-memory shuffled map-outputs");
- setDaemon(true);
- }
-
- @Override
- public void merge(List> inputs) throws IOException {
- if (inputs == null || inputs.size() == 0) {
- return;
- }
-
- //name this output file same as the name of the first file that is
- //there in the current list of inmem files (this is guaranteed to
- //be absent on the disk currently. So we don't overwrite a prev.
- //created spill). Also we need to create the output file now since
- //it is not guaranteed that this file will be present after merge
- //is called (we delete empty files as soon as we see them
- //in the merge method)
-
- //figure out the mapId
- TaskAttemptID mapId = inputs.get(0).getMapId();
- TaskID mapTaskId = mapId.getTaskID();
-
- List> inMemorySegments = new ArrayList>();
- long mergeOutputSize =
- createInMemorySegments(inputs, inMemorySegments,0);
- int noInMemorySegments = inMemorySegments.size();
-
- Path outputPath =
- mapOutputFile.getInputFileForWrite(mapTaskId,
- mergeOutputSize).suffix(
- Task.MERGED_OUTPUT_PREFIX);
-
- Writer writer =
- new Writer(jobConf, rfs, outputPath,
- (Class) jobConf.getMapOutputKeyClass(),
- (Class) jobConf.getMapOutputValueClass(),
- codec, null);
-
- RawKeyValueIterator rIter = null;
- try {
- LOG.info("Initiating in-memory merge with " + noInMemorySegments +
- " segments...");
-
- rIter = Merger.merge(jobConf, rfs,
- (Class)jobConf.getMapOutputKeyClass(),
- (Class)jobConf.getMapOutputValueClass(),
- inMemorySegments, inMemorySegments.size(),
- new Path(reduceId.toString()),
- (RawComparator)jobConf.getOutputKeyComparator(),
- reporter, spilledRecordsCounter, null, null);
-
- if (null == combinerClass) {
- Merger.writeFile(rIter, writer, reporter, jobConf);
- } else {
- combineCollector.setWriter(writer);
- combineAndSpill(rIter, reduceCombineInputCounter);
- }
- writer.close();
-
- LOG.info(reduceId +
- " Merge of the " + noInMemorySegments +
- " files in-memory complete." +
- " Local file is " + outputPath + " of size " +
- localFS.getFileStatus(outputPath).getLen());
- } catch (IOException e) {
- //make sure that we delete the ondisk file that we created
- //earlier when we invoked cloneFileAttributes
- localFS.delete(outputPath, true);
- throw e;
- }
-
- // Note the output of the merge
- closeOnDiskFile(outputPath);
- }
-
- }
-
- private class OnDiskMerger extends MergeThread {
-
- public OnDiskMerger(MergeManager manager) {
- super(manager, Integer.MAX_VALUE, exceptionReporter);
- setName("OnDiskMerger - Thread to merge on-disk map-outputs");
- setDaemon(true);
- }
-
- @Override
- public void merge(List inputs) throws IOException {
- // sanity check
- if (inputs == null || inputs.isEmpty()) {
- LOG.info("No ondisk files to merge...");
- return;
- }
-
- long approxOutputSize = 0;
- int bytesPerSum =
- jobConf.getInt("io.bytes.per.checksum", 512);
-
- LOG.info("OnDiskMerger: We have " + inputs.size() +
- " map outputs on disk. Triggering merge...");
-
- // 1. Prepare the list of files to be merged.
- for (Path file : inputs) {
- approxOutputSize += localFS.getFileStatus(file).getLen();
- }
-
- // add the checksum length
- approxOutputSize +=
- ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
-
- // 2. Start the on-disk merge process
- Path outputPath =
- localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
- approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
- Writer writer =
- new Writer(jobConf, rfs, outputPath,
- (Class) jobConf.getMapOutputKeyClass(),
- (Class) jobConf.getMapOutputValueClass(),
- codec, null);
- RawKeyValueIterator iter = null;
- Path tmpDir = new Path(reduceId.toString());
- try {
- iter = Merger.merge(jobConf, rfs,
- (Class) jobConf.getMapOutputKeyClass(),
- (Class) jobConf.getMapOutputValueClass(),
- codec, inputs.toArray(new Path[inputs.size()]),
- true, ioSortFactor, tmpDir,
- (RawComparator) jobConf.getOutputKeyComparator(),
- reporter, spilledRecordsCounter, null,
- mergedMapOutputsCounter, null);
-
- Merger.writeFile(iter, writer, reporter, jobConf);
- writer.close();
- } catch (IOException e) {
- localFS.delete(outputPath, true);
- throw e;
- }
-
- closeOnDiskFile(outputPath);
-
- LOG.info(reduceId +
- " Finished merging " + inputs.size() +
- " map output files on disk of total-size " +
- approxOutputSize + "." +
- " Local output file is " + outputPath + " of size " +
- localFS.getFileStatus(outputPath).getLen());
- }
- }
-
- private void combineAndSpill(
- RawKeyValueIterator kvIter,
- Counters.Counter inCounter) throws IOException {
- JobConf job = jobConf;
- Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
- Class keyClass = (Class) job.getMapOutputKeyClass();
- Class valClass = (Class) job.getMapOutputValueClass();
- RawComparator comparator =
- (RawComparator)job.getOutputKeyComparator();
- try {
- CombineValuesIterator values = new CombineValuesIterator(
- kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
- inCounter);
- while (values.more()) {
- combiner.reduce(values.getKey(), values, combineCollector,
- Reporter.NULL);
- values.nextKey();
- }
- } finally {
- combiner.close();
- }
- }
-
- private long createInMemorySegments(List> inMemoryMapOutputs,
- List> inMemorySegments,
- long leaveBytes
- ) throws IOException {
- long totalSize = 0L;
- // We could use fullSize could come from the RamManager, but files can be
- // closed but not yet present in inMemoryMapOutputs
- long fullSize = 0L;
- for (MapOutput mo : inMemoryMapOutputs) {
- fullSize += mo.getMemory().length;
- }
- while(fullSize > leaveBytes) {
- MapOutput mo = inMemoryMapOutputs.remove(0);
- byte[] data = mo.getMemory();
- long size = data.length;
- totalSize += size;
- fullSize -= size;
- Reader reader = new InMemoryReader(MergeManager.this,
- mo.getMapId(),
- data, 0, (int)size);
- inMemorySegments.add(new Segment(reader, true,
- (mo.isPrimaryMapOutput() ?
- mergedMapOutputsCounter : null)));
- }
- return totalSize;
- }
-
- class RawKVIteratorReader extends IFile.Reader {
-
- private final RawKeyValueIterator kvIter;
-
- public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
- throws IOException {
- super(null, null, size, null, spilledRecordsCounter);
- this.kvIter = kvIter;
- }
- public boolean nextRawKey(DataInputBuffer key) throws IOException {
- if (kvIter.next()) {
- final DataInputBuffer kb = kvIter.getKey();
- final int kp = kb.getPosition();
- final int klen = kb.getLength() - kp;
- key.reset(kb.getData(), kp, klen);
- bytesRead += klen;
- return true;
- }
- return false;
- }
- public void nextRawValue(DataInputBuffer value) throws IOException {
- final DataInputBuffer vb = kvIter.getValue();
- final int vp = vb.getPosition();
- final int vlen = vb.getLength() - vp;
- value.reset(vb.getData(), vp, vlen);
- bytesRead += vlen;
- }
- public long getPosition() throws IOException {
- return bytesRead;
- }
-
- public void close() throws IOException {
- kvIter.close();
- }
- }
-
- private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
- List> inMemoryMapOutputs,
- List onDiskMapOutputs
- ) throws IOException {
- LOG.info("finalMerge called with " +
- inMemoryMapOutputs.size() + " in-memory map-outputs and " +
- onDiskMapOutputs.size() + " on-disk map-outputs");
-
- final float maxRedPer =
- job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
- if (maxRedPer > 1.0 || maxRedPer < 0.0) {
- throw new IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT +
- maxRedPer);
- }
- int maxInMemReduce = (int)Math.min(
- Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
-
-
- // merge config params
- Class keyClass = (Class)job.getMapOutputKeyClass();
- Class valueClass = (Class)job.getMapOutputValueClass();
- boolean keepInputs = job.getKeepFailedTaskFiles();
- final Path tmpDir = new Path(reduceId.toString());
- final RawComparator comparator =
- (RawComparator)job.getOutputKeyComparator();
-
- // segments required to vacate memory
- List> memDiskSegments = new ArrayList>();
- long inMemToDiskBytes = 0;
- boolean mergePhaseFinished = false;
- if (inMemoryMapOutputs.size() > 0) {
- TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
- inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs,
- memDiskSegments,
- maxInMemReduce);
- final int numMemDiskSegments = memDiskSegments.size();
- if (numMemDiskSegments > 0 &&
- ioSortFactor > onDiskMapOutputs.size()) {
-
- // If we reach here, it implies that we have less than io.sort.factor
- // disk segments and this will be incremented by 1 (result of the
- // memory segments merge). Since this total would still be
- // <= io.sort.factor, we will not do any more intermediate merges,
- // the merge of all these disk segments would be directly fed to the
- // reduce method
-
- mergePhaseFinished = true;
- // must spill to disk, but can't retain in-mem for intermediate merge
- final Path outputPath =
- mapOutputFile.getInputFileForWrite(mapId,
- inMemToDiskBytes).suffix(
- Task.MERGED_OUTPUT_PREFIX);
- final RawKeyValueIterator rIter = Merger.merge(job, fs,
- keyClass, valueClass, memDiskSegments, numMemDiskSegments,
- tmpDir, comparator, reporter, spilledRecordsCounter, null,
- mergePhase);
- final Writer writer = new Writer(job, fs, outputPath,
- keyClass, valueClass, codec, null);
- try {
- Merger.writeFile(rIter, writer, reporter, job);
- // add to list of final disk outputs.
- onDiskMapOutputs.add(outputPath);
- } catch (IOException e) {
- if (null != outputPath) {
- try {
- fs.delete(outputPath, true);
- } catch (IOException ie) {
- // NOTHING
- }
- }
- throw e;
- } finally {
- if (null != writer) {
- writer.close();
- }
- }
- LOG.info("Merged " + numMemDiskSegments + " segments, " +
- inMemToDiskBytes + " bytes to disk to satisfy " +
- "reduce memory limit");
- inMemToDiskBytes = 0;
- memDiskSegments.clear();
- } else if (inMemToDiskBytes != 0) {
- LOG.info("Keeping " + numMemDiskSegments + " segments, " +
- inMemToDiskBytes + " bytes in memory for " +
- "intermediate, on-disk merge");
- }
- }
-
- // segments on disk
- List> diskSegments = new ArrayList>();
- long onDiskBytes = inMemToDiskBytes;
- Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]);
- for (Path file : onDisk) {
- onDiskBytes += fs.getFileStatus(file).getLen();
- LOG.debug("Disk file: " + file + " Length is " +
- fs.getFileStatus(file).getLen());
- diskSegments.add(new Segment(job, fs, file, codec, keepInputs,
- (file.toString().endsWith(
- Task.MERGED_OUTPUT_PREFIX) ?
- null : mergedMapOutputsCounter)
- ));
- }
- LOG.info("Merging " + onDisk.length + " files, " +
- onDiskBytes + " bytes from disk");
- Collections.sort(diskSegments, new Comparator>() {
- public int compare(Segment o1, Segment o2) {
- if (o1.getLength() == o2.getLength()) {
- return 0;
- }
- return o1.getLength() < o2.getLength() ? -1 : 1;
- }
- });
-
- // build final list of segments from merged backed by disk + in-mem
- List> finalSegments = new ArrayList>();
- long inMemBytes = createInMemorySegments(inMemoryMapOutputs,
- finalSegments, 0);
- LOG.info("Merging " + finalSegments.size() + " segments, " +
- inMemBytes + " bytes from memory into reduce");
- if (0 != onDiskBytes) {
- final int numInMemSegments = memDiskSegments.size();
- diskSegments.addAll(0, memDiskSegments);
- memDiskSegments.clear();
- // Pass mergePhase only if there is a going to be intermediate
- // merges. See comment where mergePhaseFinished is being set
- Progress thisPhase = (mergePhaseFinished) ? null : mergePhase;
- RawKeyValueIterator diskMerge = Merger.merge(
- job, fs, keyClass, valueClass, diskSegments,
- ioSortFactor, numInMemSegments, tmpDir, comparator,
- reporter, false, spilledRecordsCounter, null, thisPhase);
- diskSegments.clear();
- if (0 == finalSegments.size()) {
- return diskMerge;
- }
- finalSegments.add(new Segment(
- new RawKVIteratorReader(diskMerge, onDiskBytes), true));
- }
- return Merger.merge(job, fs, keyClass, valueClass,
- finalSegments, finalSegments.size(), tmpDir,
- comparator, reporter, spilledRecordsCounter, null,
- null);
-
- }
+ public RawKeyValueIterator close() throws Throwable;
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
new file mode 100644
index 00000000000..007897f17f0
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
@@ -0,0 +1,797 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.IFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapred.Merger;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.IFile.Reader;
+import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapred.Merger.Segment;
+import org.apache.hadoop.mapred.Task.CombineOutputCollector;
+import org.apache.hadoop.mapred.Task.CombineValuesIterator;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@SuppressWarnings(value={"unchecked"})
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class MergeManagerImpl implements MergeManager {
+
+ private static final Log LOG = LogFactory.getLog(MergeManagerImpl.class);
+
+ /* Maximum percentage of the in-memory limit that a single shuffle can
+ * consume*/
+ private static final float DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT
+ = 0.25f;
+
+ private final TaskAttemptID reduceId;
+
+ private final JobConf jobConf;
+ private final FileSystem localFS;
+ private final FileSystem rfs;
+ private final LocalDirAllocator localDirAllocator;
+
+ protected MapOutputFile mapOutputFile;
+
+ Set> inMemoryMergedMapOutputs =
+ new TreeSet>(new MapOutputComparator());
+ private IntermediateMemoryToMemoryMerger memToMemMerger;
+
+ Set> inMemoryMapOutputs =
+ new TreeSet>(new MapOutputComparator());
+ private final MergeThread, K,V> inMemoryMerger;
+
+ Set onDiskMapOutputs = new TreeSet();
+ private final OnDiskMerger onDiskMerger;
+
+ private final long memoryLimit;
+ private long usedMemory;
+ private long commitMemory;
+ private final long maxSingleShuffleLimit;
+
+ private final int memToMemMergeOutputsThreshold;
+ private final long mergeThreshold;
+
+ private final int ioSortFactor;
+
+ private final Reporter reporter;
+ private final ExceptionReporter exceptionReporter;
+
+ /**
+ * Combiner class to run during in-memory merge, if defined.
+ */
+ private final Class extends Reducer> combinerClass;
+
+ /**
+ * Resettable collector used for combine.
+ */
+ private final CombineOutputCollector combineCollector;
+
+ private final Counters.Counter spilledRecordsCounter;
+
+ private final Counters.Counter reduceCombineInputCounter;
+
+ private final Counters.Counter mergedMapOutputsCounter;
+
+ private final CompressionCodec codec;
+
+ private final Progress mergePhase;
+
+ public MergeManagerImpl(TaskAttemptID reduceId, JobConf jobConf,
+ FileSystem localFS,
+ LocalDirAllocator localDirAllocator,
+ Reporter reporter,
+ CompressionCodec codec,
+ Class extends Reducer> combinerClass,
+ CombineOutputCollector combineCollector,
+ Counters.Counter spilledRecordsCounter,
+ Counters.Counter reduceCombineInputCounter,
+ Counters.Counter mergedMapOutputsCounter,
+ ExceptionReporter exceptionReporter,
+ Progress mergePhase, MapOutputFile mapOutputFile) {
+ this.reduceId = reduceId;
+ this.jobConf = jobConf;
+ this.localDirAllocator = localDirAllocator;
+ this.exceptionReporter = exceptionReporter;
+
+ this.reporter = reporter;
+ this.codec = codec;
+ this.combinerClass = combinerClass;
+ this.combineCollector = combineCollector;
+ this.reduceCombineInputCounter = reduceCombineInputCounter;
+ this.spilledRecordsCounter = spilledRecordsCounter;
+ this.mergedMapOutputsCounter = mergedMapOutputsCounter;
+ this.mapOutputFile = mapOutputFile;
+ this.mapOutputFile.setConf(jobConf);
+
+ this.localFS = localFS;
+ this.rfs = ((LocalFileSystem)localFS).getRaw();
+
+ final float maxInMemCopyUse =
+ jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 0.90f);
+ if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+ throw new IllegalArgumentException("Invalid value for " +
+ MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": " +
+ maxInMemCopyUse);
+ }
+
+ // Allow unit tests to fix Runtime memory
+ this.memoryLimit =
+ (long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
+ Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
+ * maxInMemCopyUse);
+
+ this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
+
+ final float singleShuffleMemoryLimitPercent =
+ jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT,
+ DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT);
+ if (singleShuffleMemoryLimitPercent <= 0.0f
+ || singleShuffleMemoryLimitPercent > 1.0f) {
+ throw new IllegalArgumentException("Invalid value for "
+ + MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+ + singleShuffleMemoryLimitPercent);
+ }
+
+ usedMemory = 0L;
+ commitMemory = 0L;
+ this.maxSingleShuffleLimit =
+ (long)(memoryLimit * singleShuffleMemoryLimitPercent);
+ this.memToMemMergeOutputsThreshold =
+ jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
+ this.mergeThreshold = (long)(this.memoryLimit *
+ jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT,
+ 0.90f));
+ LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
+ "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
+ "mergeThreshold=" + mergeThreshold + ", " +
+ "ioSortFactor=" + ioSortFactor + ", " +
+ "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
+
+ if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
+ throw new RuntimeException("Invlaid configuration: "
+ + "maxSingleShuffleLimit should be less than mergeThreshold"
+ + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
+ + "mergeThreshold: " + this.mergeThreshold);
+ }
+
+ boolean allowMemToMemMerge =
+ jobConf.getBoolean(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, false);
+ if (allowMemToMemMerge) {
+ this.memToMemMerger =
+ new IntermediateMemoryToMemoryMerger(this,
+ memToMemMergeOutputsThreshold);
+ this.memToMemMerger.start();
+ } else {
+ this.memToMemMerger = null;
+ }
+
+ this.inMemoryMerger = createInMemoryMerger();
+ this.inMemoryMerger.start();
+
+ this.onDiskMerger = new OnDiskMerger(this);
+ this.onDiskMerger.start();
+
+ this.mergePhase = mergePhase;
+ }
+
+ protected MergeThread, K,V> createInMemoryMerger() {
+ return new InMemoryMerger(this);
+ }
+
+ TaskAttemptID getReduceId() {
+ return reduceId;
+ }
+
+ @VisibleForTesting
+ ExceptionReporter getExceptionReporter() {
+ return exceptionReporter;
+ }
+
+ @Override
+ public void waitForResource() throws InterruptedException {
+ inMemoryMerger.waitForMerge();
+ }
+
+ private boolean canShuffleToMemory(long requestedSize) {
+ return (requestedSize < maxSingleShuffleLimit);
+ }
+
+ @Override
+ public synchronized MapOutput reserve(TaskAttemptID mapId,
+ long requestedSize,
+ int fetcher
+ ) throws IOException {
+ if (!canShuffleToMemory(requestedSize)) {
+ LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
+ " is greater than maxSingleShuffleLimit (" +
+ maxSingleShuffleLimit + ")");
+ return new OnDiskMapOutput(mapId, reduceId, this, requestedSize,
+ jobConf, mapOutputFile, fetcher, true);
+ }
+
+ // Stall shuffle if we are above the memory limit
+
+ // It is possible that all threads could just be stalling and not make
+ // progress at all. This could happen when:
+ //
+ // requested size is causing the used memory to go above limit &&
+ // requested size < singleShuffleLimit &&
+ // current used size < mergeThreshold (merge will not get triggered)
+ //
+ // To avoid this from happening, we allow exactly one thread to go past
+ // the memory limit. We check (usedMemory > memoryLimit) and not
+ // (usedMemory + requestedSize > memoryLimit). When this thread is done
+ // fetching, this will automatically trigger a merge thereby unlocking
+ // all the stalled threads
+
+ if (usedMemory > memoryLimit) {
+ LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
+ + ") is greater than memoryLimit (" + memoryLimit + ")." +
+ " CommitMemory is (" + commitMemory + ")");
+ return null;
+ }
+
+ // Allow the in-memory shuffle to progress
+ LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
+ + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
+ + "CommitMemory is (" + commitMemory + ")");
+ return unconditionalReserve(mapId, requestedSize, true);
+ }
+
+ /**
+ * Unconditional Reserve is used by the Memory-to-Memory thread
+ * @return
+ */
+ private synchronized InMemoryMapOutput unconditionalReserve(
+ TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
+ usedMemory += requestedSize;
+ return new InMemoryMapOutput(jobConf, mapId, this, (int)requestedSize,
+ codec, primaryMapOutput);
+ }
+
+ synchronized void unreserve(long size) {
+ usedMemory -= size;
+ }
+
+ public synchronized void closeInMemoryFile(InMemoryMapOutput mapOutput) {
+ inMemoryMapOutputs.add(mapOutput);
+ LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
+ + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
+ + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);
+
+ commitMemory+= mapOutput.getSize();
+
+ // Can hang if mergeThreshold is really low.
+ if (commitMemory >= mergeThreshold) {
+ LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
+ commitMemory + " > mergeThreshold=" + mergeThreshold +
+ ". Current usedMemory=" + usedMemory);
+ inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
+ inMemoryMergedMapOutputs.clear();
+ inMemoryMerger.startMerge(inMemoryMapOutputs);
+ commitMemory = 0L; // Reset commitMemory.
+ }
+
+ if (memToMemMerger != null) {
+ if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
+ memToMemMerger.startMerge(inMemoryMapOutputs);
+ }
+ }
+ }
+
+
+ public synchronized void closeInMemoryMergedFile(InMemoryMapOutput mapOutput) {
+ inMemoryMergedMapOutputs.add(mapOutput);
+ LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() +
+ ", inMemoryMergedMapOutputs.size() -> " +
+ inMemoryMergedMapOutputs.size());
+ }
+
+ public synchronized void closeOnDiskFile(Path file) {
+ onDiskMapOutputs.add(file);
+
+ if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
+ onDiskMerger.startMerge(onDiskMapOutputs);
+ }
+ }
+
+ @Override
+ public RawKeyValueIterator close() throws Throwable {
+ // Wait for on-going merges to complete
+ if (memToMemMerger != null) {
+ memToMemMerger.close();
+ }
+ inMemoryMerger.close();
+ onDiskMerger.close();
+
+ List> memory =
+ new ArrayList>(inMemoryMergedMapOutputs);
+ memory.addAll(inMemoryMapOutputs);
+ List disk = new ArrayList(onDiskMapOutputs);
+ return finalMerge(jobConf, rfs, memory, disk);
+ }
+
+ private class IntermediateMemoryToMemoryMerger
+ extends MergeThread, K, V> {
+
+ public IntermediateMemoryToMemoryMerger(MergeManagerImpl manager,
+ int mergeFactor) {
+ super(manager, mergeFactor, exceptionReporter);
+ setName("InMemoryMerger - Thread to do in-memory merge of in-memory " +
+ "shuffled map-outputs");
+ setDaemon(true);
+ }
+
+ @Override
+ public void merge(List> inputs) throws IOException {
+ if (inputs == null || inputs.size() == 0) {
+ return;
+ }
+
+ TaskAttemptID dummyMapId = inputs.get(0).getMapId();
+ List> inMemorySegments = new ArrayList>();
+ long mergeOutputSize =
+ createInMemorySegments(inputs, inMemorySegments, 0);
+ int noInMemorySegments = inMemorySegments.size();
+
+ InMemoryMapOutput mergedMapOutputs =
+ unconditionalReserve(dummyMapId, mergeOutputSize, false);
+
+ Writer writer =
+ new InMemoryWriter(mergedMapOutputs.getArrayStream());
+
+ LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
+ " segments of total-size: " + mergeOutputSize);
+
+ RawKeyValueIterator rIter =
+ Merger.merge(jobConf, rfs,
+ (Class)jobConf.getMapOutputKeyClass(),
+ (Class)jobConf.getMapOutputValueClass(),
+ inMemorySegments, inMemorySegments.size(),
+ new Path(reduceId.toString()),
+ (RawComparator)jobConf.getOutputKeyComparator(),
+ reporter, null, null, null);
+ Merger.writeFile(rIter, writer, reporter, jobConf);
+ writer.close();
+
+ LOG.info(reduceId +
+ " Memory-to-Memory merge of the " + noInMemorySegments +
+ " files in-memory complete.");
+
+ // Note the output of the merge
+ closeInMemoryMergedFile(mergedMapOutputs);
+ }
+ }
+
+ private class InMemoryMerger extends MergeThread, K,V> {
+
+ public InMemoryMerger(MergeManagerImpl manager) {
+ super(manager, Integer.MAX_VALUE, exceptionReporter);
+ setName
+ ("InMemoryMerger - Thread to merge in-memory shuffled map-outputs");
+ setDaemon(true);
+ }
+
+ @Override
+ public void merge(List> inputs) throws IOException {
+ if (inputs == null || inputs.size() == 0) {
+ return;
+ }
+
+ //name this output file same as the name of the first file that is
+ //there in the current list of inmem files (this is guaranteed to
+ //be absent on the disk currently. So we don't overwrite a prev.
+ //created spill). Also we need to create the output file now since
+ //it is not guaranteed that this file will be present after merge
+ //is called (we delete empty files as soon as we see them
+ //in the merge method)
+
+ //figure out the mapId
+ TaskAttemptID mapId = inputs.get(0).getMapId();
+ TaskID mapTaskId = mapId.getTaskID();
+
+ List> inMemorySegments = new ArrayList>();
+ long mergeOutputSize =
+ createInMemorySegments(inputs, inMemorySegments,0);
+ int noInMemorySegments = inMemorySegments.size();
+
+ Path outputPath =
+ mapOutputFile.getInputFileForWrite(mapTaskId,
+ mergeOutputSize).suffix(
+ Task.MERGED_OUTPUT_PREFIX);
+
+ Writer writer =
+ new Writer(jobConf, rfs, outputPath,
+ (Class) jobConf.getMapOutputKeyClass(),
+ (Class) jobConf.getMapOutputValueClass(),
+ codec, null);
+
+ RawKeyValueIterator rIter = null;
+ try {
+ LOG.info("Initiating in-memory merge with " + noInMemorySegments +
+ " segments...");
+
+ rIter = Merger.merge(jobConf, rfs,
+ (Class)jobConf.getMapOutputKeyClass(),
+ (Class)jobConf.getMapOutputValueClass(),
+ inMemorySegments, inMemorySegments.size(),
+ new Path(reduceId.toString()),
+ (RawComparator)jobConf.getOutputKeyComparator(),
+ reporter, spilledRecordsCounter, null, null);
+
+ if (null == combinerClass) {
+ Merger.writeFile(rIter, writer, reporter, jobConf);
+ } else {
+ combineCollector.setWriter(writer);
+ combineAndSpill(rIter, reduceCombineInputCounter);
+ }
+ writer.close();
+
+ LOG.info(reduceId +
+ " Merge of the " + noInMemorySegments +
+ " files in-memory complete." +
+ " Local file is " + outputPath + " of size " +
+ localFS.getFileStatus(outputPath).getLen());
+ } catch (IOException e) {
+ //make sure that we delete the ondisk file that we created
+ //earlier when we invoked cloneFileAttributes
+ localFS.delete(outputPath, true);
+ throw e;
+ }
+
+ // Note the output of the merge
+ closeOnDiskFile(outputPath);
+ }
+
+ }
+
+ private class OnDiskMerger extends MergeThread {
+
+ public OnDiskMerger(MergeManagerImpl manager) {
+ super(manager, Integer.MAX_VALUE, exceptionReporter);
+ setName("OnDiskMerger - Thread to merge on-disk map-outputs");
+ setDaemon(true);
+ }
+
+ @Override
+ public void merge(List inputs) throws IOException {
+ // sanity check
+ if (inputs == null || inputs.isEmpty()) {
+ LOG.info("No ondisk files to merge...");
+ return;
+ }
+
+ long approxOutputSize = 0;
+ int bytesPerSum =
+ jobConf.getInt("io.bytes.per.checksum", 512);
+
+ LOG.info("OnDiskMerger: We have " + inputs.size() +
+ " map outputs on disk. Triggering merge...");
+
+ // 1. Prepare the list of files to be merged.
+ for (Path file : inputs) {
+ approxOutputSize += localFS.getFileStatus(file).getLen();
+ }
+
+ // add the checksum length
+ approxOutputSize +=
+ ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
+
+ // 2. Start the on-disk merge process
+ Path outputPath =
+ localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
+ approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
+ Writer writer =
+ new Writer(jobConf, rfs, outputPath,
+ (Class) jobConf.getMapOutputKeyClass(),
+ (Class) jobConf.getMapOutputValueClass(),
+ codec, null);
+ RawKeyValueIterator iter = null;
+ Path tmpDir = new Path(reduceId.toString());
+ try {
+ iter = Merger.merge(jobConf, rfs,
+ (Class) jobConf.getMapOutputKeyClass(),
+ (Class) jobConf.getMapOutputValueClass(),
+ codec, inputs.toArray(new Path[inputs.size()]),
+ true, ioSortFactor, tmpDir,
+ (RawComparator) jobConf.getOutputKeyComparator(),
+ reporter, spilledRecordsCounter, null,
+ mergedMapOutputsCounter, null);
+
+ Merger.writeFile(iter, writer, reporter, jobConf);
+ writer.close();
+ } catch (IOException e) {
+ localFS.delete(outputPath, true);
+ throw e;
+ }
+
+ closeOnDiskFile(outputPath);
+
+ LOG.info(reduceId +
+ " Finished merging " + inputs.size() +
+ " map output files on disk of total-size " +
+ approxOutputSize + "." +
+ " Local output file is " + outputPath + " of size " +
+ localFS.getFileStatus(outputPath).getLen());
+ }
+ }
+
+ private void combineAndSpill(
+ RawKeyValueIterator kvIter,
+ Counters.Counter inCounter) throws IOException {
+ JobConf job = jobConf;
+ Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
+ Class keyClass = (Class) job.getMapOutputKeyClass();
+ Class valClass = (Class) job.getMapOutputValueClass();
+ RawComparator comparator =
+ (RawComparator)job.getOutputKeyComparator();
+ try {
+ CombineValuesIterator values = new CombineValuesIterator(
+ kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
+ inCounter);
+ while (values.more()) {
+ combiner.reduce(values.getKey(), values, combineCollector,
+ Reporter.NULL);
+ values.nextKey();
+ }
+ } finally {
+ combiner.close();
+ }
+ }
+
+ private long createInMemorySegments(List> inMemoryMapOutputs,
+ List> inMemorySegments,
+ long leaveBytes
+ ) throws IOException {
+ long totalSize = 0L;
+ // We could use fullSize could come from the RamManager, but files can be
+ // closed but not yet present in inMemoryMapOutputs
+ long fullSize = 0L;
+ for (InMemoryMapOutput mo : inMemoryMapOutputs) {
+ fullSize += mo.getMemory().length;
+ }
+ while(fullSize > leaveBytes) {
+ InMemoryMapOutput mo = inMemoryMapOutputs.remove(0);
+ byte[] data = mo.getMemory();
+ long size = data.length;
+ totalSize += size;
+ fullSize -= size;
+ Reader reader = new InMemoryReader(MergeManagerImpl.this,
+ mo.getMapId(),
+ data, 0, (int)size);
+ inMemorySegments.add(new Segment(reader, true,
+ (mo.isPrimaryMapOutput() ?
+ mergedMapOutputsCounter : null)));
+ }
+ return totalSize;
+ }
+
+ class RawKVIteratorReader extends IFile.Reader {
+
+ private final RawKeyValueIterator kvIter;
+
+ public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
+ throws IOException {
+ super(null, null, size, null, spilledRecordsCounter);
+ this.kvIter = kvIter;
+ }
+ public boolean nextRawKey(DataInputBuffer key) throws IOException {
+ if (kvIter.next()) {
+ final DataInputBuffer kb = kvIter.getKey();
+ final int kp = kb.getPosition();
+ final int klen = kb.getLength() - kp;
+ key.reset(kb.getData(), kp, klen);
+ bytesRead += klen;
+ return true;
+ }
+ return false;
+ }
+ public void nextRawValue(DataInputBuffer value) throws IOException {
+ final DataInputBuffer vb = kvIter.getValue();
+ final int vp = vb.getPosition();
+ final int vlen = vb.getLength() - vp;
+ value.reset(vb.getData(), vp, vlen);
+ bytesRead += vlen;
+ }
+ public long getPosition() throws IOException {
+ return bytesRead;
+ }
+
+ public void close() throws IOException {
+ kvIter.close();
+ }
+ }
+
+ private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
+ List> inMemoryMapOutputs,
+ List onDiskMapOutputs
+ ) throws IOException {
+ LOG.info("finalMerge called with " +
+ inMemoryMapOutputs.size() + " in-memory map-outputs and " +
+ onDiskMapOutputs.size() + " on-disk map-outputs");
+
+ final float maxRedPer =
+ job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
+ if (maxRedPer > 1.0 || maxRedPer < 0.0) {
+ throw new IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT +
+ maxRedPer);
+ }
+ int maxInMemReduce = (int)Math.min(
+ Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
+
+
+ // merge config params
+ Class keyClass = (Class)job.getMapOutputKeyClass();
+ Class valueClass = (Class)job.getMapOutputValueClass();
+ boolean keepInputs = job.getKeepFailedTaskFiles();
+ final Path tmpDir = new Path(reduceId.toString());
+ final RawComparator comparator =
+ (RawComparator)job.getOutputKeyComparator();
+
+ // segments required to vacate memory
+ List> memDiskSegments = new ArrayList>();
+ long inMemToDiskBytes = 0;
+ boolean mergePhaseFinished = false;
+ if (inMemoryMapOutputs.size() > 0) {
+ TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
+ inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs,
+ memDiskSegments,
+ maxInMemReduce);
+ final int numMemDiskSegments = memDiskSegments.size();
+ if (numMemDiskSegments > 0 &&
+ ioSortFactor > onDiskMapOutputs.size()) {
+
+ // If we reach here, it implies that we have less than io.sort.factor
+ // disk segments and this will be incremented by 1 (result of the
+ // memory segments merge). Since this total would still be
+ // <= io.sort.factor, we will not do any more intermediate merges,
+ // the merge of all these disk segments would be directly fed to the
+ // reduce method
+
+ mergePhaseFinished = true;
+ // must spill to disk, but can't retain in-mem for intermediate merge
+ final Path outputPath =
+ mapOutputFile.getInputFileForWrite(mapId,
+ inMemToDiskBytes).suffix(
+ Task.MERGED_OUTPUT_PREFIX);
+ final RawKeyValueIterator rIter = Merger.merge(job, fs,
+ keyClass, valueClass, memDiskSegments, numMemDiskSegments,
+ tmpDir, comparator, reporter, spilledRecordsCounter, null,
+ mergePhase);
+ final Writer writer = new Writer(job, fs, outputPath,
+ keyClass, valueClass, codec, null);
+ try {
+ Merger.writeFile(rIter, writer, reporter, job);
+ // add to list of final disk outputs.
+ onDiskMapOutputs.add(outputPath);
+ } catch (IOException e) {
+ if (null != outputPath) {
+ try {
+ fs.delete(outputPath, true);
+ } catch (IOException ie) {
+ // NOTHING
+ }
+ }
+ throw e;
+ } finally {
+ if (null != writer) {
+ writer.close();
+ }
+ }
+ LOG.info("Merged " + numMemDiskSegments + " segments, " +
+ inMemToDiskBytes + " bytes to disk to satisfy " +
+ "reduce memory limit");
+ inMemToDiskBytes = 0;
+ memDiskSegments.clear();
+ } else if (inMemToDiskBytes != 0) {
+ LOG.info("Keeping " + numMemDiskSegments + " segments, " +
+ inMemToDiskBytes + " bytes in memory for " +
+ "intermediate, on-disk merge");
+ }
+ }
+
+ // segments on disk
+ List> diskSegments = new ArrayList>();
+ long onDiskBytes = inMemToDiskBytes;
+ Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]);
+ for (Path file : onDisk) {
+ onDiskBytes += fs.getFileStatus(file).getLen();
+ LOG.debug("Disk file: " + file + " Length is " +
+ fs.getFileStatus(file).getLen());
+ diskSegments.add(new Segment(job, fs, file, codec, keepInputs,
+ (file.toString().endsWith(
+ Task.MERGED_OUTPUT_PREFIX) ?
+ null : mergedMapOutputsCounter)
+ ));
+ }
+ LOG.info("Merging " + onDisk.length + " files, " +
+ onDiskBytes + " bytes from disk");
+ Collections.sort(diskSegments, new Comparator>() {
+ public int compare(Segment o1, Segment o2) {
+ if (o1.getLength() == o2.getLength()) {
+ return 0;
+ }
+ return o1.getLength() < o2.getLength() ? -1 : 1;
+ }
+ });
+
+ // build final list of segments from merged backed by disk + in-mem
+ List> finalSegments = new ArrayList>();
+ long inMemBytes = createInMemorySegments(inMemoryMapOutputs,
+ finalSegments, 0);
+ LOG.info("Merging " + finalSegments.size() + " segments, " +
+ inMemBytes + " bytes from memory into reduce");
+ if (0 != onDiskBytes) {
+ final int numInMemSegments = memDiskSegments.size();
+ diskSegments.addAll(0, memDiskSegments);
+ memDiskSegments.clear();
+ // Pass mergePhase only if there is a going to be intermediate
+ // merges. See comment where mergePhaseFinished is being set
+ Progress thisPhase = (mergePhaseFinished) ? null : mergePhase;
+ RawKeyValueIterator diskMerge = Merger.merge(
+ job, fs, keyClass, valueClass, diskSegments,
+ ioSortFactor, numInMemSegments, tmpDir, comparator,
+ reporter, false, spilledRecordsCounter, null, thisPhase);
+ diskSegments.clear();
+ if (0 == finalSegments.size()) {
+ return diskMerge;
+ }
+ finalSegments.add(new Segment(
+ new RawKVIteratorReader(diskMerge, onDiskBytes), true));
+ }
+ return Merger.merge(job, fs, keyClass, valueClass,
+ finalSegments, finalSegments.size(), tmpDir,
+ comparator, reporter, spilledRecordsCounter, null,
+ null);
+
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java
index 568f4e6ffec..5db353f99c2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java
@@ -34,12 +34,12 @@ abstract class MergeThread extends Thread {
private AtomicInteger numPending = new AtomicInteger(0);
private LinkedList> pendingToBeMerged;
- protected final MergeManager manager;
+ protected final MergeManagerImpl manager;
private final ExceptionReporter reporter;
private boolean closed = false;
private final int mergeFactor;
- public MergeThread(MergeManager manager, int mergeFactor,
+ public MergeThread(MergeManagerImpl manager, int mergeFactor,
ExceptionReporter reporter) {
this.pendingToBeMerged = new LinkedList>();
this.manager = manager;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
new file mode 100644
index 00000000000..2cb86449e5d
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.io.IOUtils;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.MapOutputFile;
+
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class OnDiskMapOutput extends MapOutput {
+ private static final Log LOG = LogFactory.getLog(OnDiskMapOutput.class);
+ private final FileSystem localFS;
+ private final Path tmpOutputPath;
+ private final Path outputPath;
+ private final MergeManagerImpl merger;
+ private final OutputStream disk;
+
+ public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
+ MergeManagerImpl merger, long size,
+ JobConf conf,
+ MapOutputFile mapOutputFile,
+ int fetcher, boolean primaryMapOutput)
+ throws IOException {
+ super(mapId, size, primaryMapOutput);
+ this.merger = merger;
+ this.localFS = FileSystem.getLocal(conf);
+ outputPath =
+ mapOutputFile.getInputFileForWrite(mapId.getTaskID(),size);
+ tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
+
+ disk = localFS.create(tmpOutputPath);
+
+ }
+
+ @Override
+ public void shuffle(MapHost host, InputStream input,
+ long compressedLength, long decompressedLength,
+ ShuffleClientMetrics metrics,
+ Reporter reporter) throws IOException {
+ // Copy data to local-disk
+ long bytesLeft = compressedLength;
+ try {
+ final int BYTES_TO_READ = 64 * 1024;
+ byte[] buf = new byte[BYTES_TO_READ];
+ while (bytesLeft > 0) {
+ int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
+ if (n < 0) {
+ throw new IOException("read past end of stream reading " +
+ getMapId());
+ }
+ disk.write(buf, 0, n);
+ bytesLeft -= n;
+ metrics.inputBytes(n);
+ reporter.progress();
+ }
+
+ LOG.info("Read " + (compressedLength - bytesLeft) +
+ " bytes from map-output for " + getMapId());
+
+ disk.close();
+ } catch (IOException ioe) {
+ // Close the streams
+ IOUtils.cleanup(LOG, input, disk);
+
+ // Re-throw
+ throw ioe;
+ }
+
+ // Sanity check
+ if (bytesLeft != 0) {
+ throw new IOException("Incomplete map output received for " +
+ getMapId() + " from " +
+ host.getHostName() + " (" +
+ bytesLeft + " bytes missing of " +
+ compressedLength + ")");
+ }
+ }
+
+ @Override
+ public void commit() throws IOException {
+ localFS.rename(tmpOutputPath, outputPath);
+ merger.closeOnDiskFile(outputPath);
+ }
+
+ @Override
+ public void abort() {
+ try {
+ localFS.delete(tmpOutputPath, false);
+ } catch (IOException ie) {
+ LOG.info("failure to clean up " + tmpOutputPath, ie);
+ }
+ }
+
+ @Override
+ public String getDescription() {
+ return "DISK";
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
index 047e6435ccf..68131659607 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
@@ -21,17 +21,10 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.RawKeyValueIterator;
-import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task;
-import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
@@ -77,17 +70,21 @@ public class Shuffle implements ShuffleConsumerPlugin, ExceptionRepo
this.taskStatus = context.getStatus();
this.reduceTask = context.getReduceTask();
- scheduler =
- new ShuffleScheduler(jobConf, taskStatus, this, copyPhase,
- context.getShuffledMapsCounter(),
- context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
- merger = new MergeManager(reduceId, jobConf, context.getLocalFS(),
- context.getLocalDirAllocator(), reporter, context.getCodec(),
- context.getCombinerClass(), context.getCombineCollector(),
- context.getSpilledRecordsCounter(),
- context.getReduceCombineInputCounter(),
- context.getMergedMapOutputsCounter(),
- this, context.getMergePhase(), context.getMapOutputFile());
+ scheduler = new ShuffleScheduler(jobConf, taskStatus, this,
+ copyPhase, context.getShuffledMapsCounter(),
+ context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
+ merger = createMergeManager(context);
+ }
+
+ protected MergeManager createMergeManager(
+ ShuffleConsumerPlugin.Context context) {
+ return new MergeManagerImpl(reduceId, jobConf, context.getLocalFS(),
+ context.getLocalDirAllocator(), reporter, context.getCodec(),
+ context.getCombinerClass(), context.getCombineCollector(),
+ context.getSpilledRecordsCounter(),
+ context.getReduceCombineInputCounter(),
+ context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
+ context.getMapOutputFile());
}
@Override
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
index 92bdc74fb31..db4308fc995 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
@@ -53,7 +53,7 @@ public class TestFetcher {
private HttpURLConnection connection;
public FakeFetcher(JobConf job, TaskAttemptID reduceId,
- ShuffleScheduler scheduler, MergeManager merger, Reporter reporter,
+ ShuffleScheduler scheduler, MergeManagerImpl merger, Reporter reporter,
ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter,
SecretKey jobTokenSecret, HttpURLConnection connection) {
super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter,
@@ -77,7 +77,7 @@ public class TestFetcher {
JobConf job = new JobConf();
TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
ShuffleScheduler ss = mock(ShuffleScheduler.class);
- MergeManager mm = mock(MergeManager.class);
+ MergeManagerImpl mm = mock(MergeManagerImpl.class);
Reporter r = mock(Reporter.class);
ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
ExceptionReporter except = mock(ExceptionReporter.class);
@@ -132,7 +132,7 @@ public class TestFetcher {
JobConf job = new JobConf();
TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
ShuffleScheduler ss = mock(ShuffleScheduler.class);
- MergeManager mm = mock(MergeManager.class);
+ MergeManagerImpl mm = mock(MergeManagerImpl.class);
Reporter r = mock(Reporter.class);
ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
ExceptionReporter except = mock(ExceptionReporter.class);
@@ -167,10 +167,9 @@ public class TestFetcher {
header.write(new DataOutputStream(bout));
ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
when(connection.getInputStream()).thenReturn(in);
- //Defaults to WAIT, which is what we want to test
- MapOutput mapOut = new MapOutput(map1ID);
+ //Defaults to null, which is what we want to test
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
- .thenReturn(mapOut);
+ .thenReturn(null);
underTest.copyFromHost(host);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
index a8669639b2a..46d797c93d3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
@@ -32,13 +32,13 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
import org.junit.Assert;
import org.junit.Test;
public class TestMergeManager {
@Test(timeout=10000)
+ @SuppressWarnings("unchecked")
public void testMemoryMerge() throws Exception {
final int TOTAL_MEM_BYTES = 10000;
final int OUTPUT_SIZE = 7950;
@@ -55,45 +55,47 @@ public class TestMergeManager {
// reserve enough map output to cause a merge when it is committed
MapOutput out1 = mgr.reserve(null, OUTPUT_SIZE, 0);
- Assert.assertEquals("Should be a memory merge",
- Type.MEMORY, out1.getType());
- fillOutput(out1);
+ Assert.assertTrue("Should be a memory merge",
+ (out1 instanceof InMemoryMapOutput));
+ InMemoryMapOutput mout1 = (InMemoryMapOutput)out1;
+ fillOutput(mout1);
MapOutput out2 = mgr.reserve(null, OUTPUT_SIZE, 0);
- Assert.assertEquals("Should be a memory merge",
- Type.MEMORY, out2.getType());
- fillOutput(out2);
+ Assert.assertTrue("Should be a memory merge",
+ (out2 instanceof InMemoryMapOutput));
+ InMemoryMapOutput mout2 = (InMemoryMapOutput)out2;
+ fillOutput(mout2);
// next reservation should be a WAIT
MapOutput out3 = mgr.reserve(null, OUTPUT_SIZE, 0);
- Assert.assertEquals("Should be told to wait",
- Type.WAIT, out3.getType());
+ Assert.assertEquals("Should be told to wait", null, out3);
// trigger the first merge and wait for merge thread to start merging
// and free enough output to reserve more
- out1.commit();
- out2.commit();
+ mout1.commit();
+ mout2.commit();
mergeStart.await();
Assert.assertEquals(1, mgr.getNumMerges());
// reserve enough map output to cause another merge when committed
out1 = mgr.reserve(null, OUTPUT_SIZE, 0);
- Assert.assertEquals("Should be a memory merge",
- Type.MEMORY, out1.getType());
- fillOutput(out1);
+ Assert.assertTrue("Should be a memory merge",
+ (out1 instanceof InMemoryMapOutput));
+ mout1 = (InMemoryMapOutput)out1;
+ fillOutput(mout1);
out2 = mgr.reserve(null, OUTPUT_SIZE, 0);
- Assert.assertEquals("Should be a memory merge",
- Type.MEMORY, out2.getType());
- fillOutput(out2);
+ Assert.assertTrue("Should be a memory merge",
+ (out2 instanceof InMemoryMapOutput));
+ mout2 = (InMemoryMapOutput