MAPREDUCE-4808. Refactor MapOutput and MergeManager to facilitate reuse by Shuffle implementations. (masokan via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1438796 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cf67cccd7c
commit
d8cdee8360
|
@ -23,6 +23,9 @@ Release 2.0.3-alpha - Unreleased
|
||||||
|
|
||||||
MAPREDUCE-4807. Allow MapOutputBuffer to be pluggable. (masokan via tucu)
|
MAPREDUCE-4807. Allow MapOutputBuffer to be pluggable. (masokan via tucu)
|
||||||
|
|
||||||
|
MAPREDUCE-4808. Refactor MapOutput and MergeManager to facilitate reuse
|
||||||
|
by Shuffle implementations. (masokan via tucu)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
MAPREDUCE-3678. The Map tasks logs should have the value of input
|
MAPREDUCE-3678. The Map tasks logs should have the value of input
|
||||||
|
|
|
@ -268,7 +268,7 @@
|
||||||
This class is unlikely to get subclassed, so ignore
|
This class is unlikely to get subclassed, so ignore
|
||||||
-->
|
-->
|
||||||
<Match>
|
<Match>
|
||||||
<Class name="org.apache.hadoop.mapreduce.task.reduce.MergeManager" />
|
<Class name="org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl" />
|
||||||
<Bug pattern="SC_START_IN_CTOR" />
|
<Bug pattern="SC_START_IN_CTOR" />
|
||||||
</Match>
|
</Match>
|
||||||
|
|
||||||
|
|
|
@ -19,8 +19,6 @@ package org.apache.hadoop.mapreduce.task.reduce;
|
||||||
|
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
|
@ -38,12 +36,7 @@ import javax.net.ssl.HttpsURLConnection;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.compress.CodecPool;
|
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
|
||||||
import org.apache.hadoop.io.compress.Decompressor;
|
|
||||||
import org.apache.hadoop.io.compress.DefaultCodec;
|
|
||||||
import org.apache.hadoop.mapred.Counters;
|
import org.apache.hadoop.mapred.Counters;
|
||||||
import org.apache.hadoop.mapred.IFileInputStream;
|
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.Reporter;
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
|
@ -51,9 +44,6 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
|
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
|
||||||
import org.apache.hadoop.security.ssl.SSLFactory;
|
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||||
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
|
|
||||||
import org.apache.hadoop.util.Progressable;
|
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
@ -70,7 +60,7 @@ class Fetcher<K,V> extends Thread {
|
||||||
/* Default read timeout (in milliseconds) */
|
/* Default read timeout (in milliseconds) */
|
||||||
private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
|
private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
|
||||||
|
|
||||||
private final Progressable reporter;
|
private final Reporter reporter;
|
||||||
private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
|
private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
|
||||||
CONNECTION, WRONG_REDUCE}
|
CONNECTION, WRONG_REDUCE}
|
||||||
|
|
||||||
|
@ -92,15 +82,10 @@ class Fetcher<K,V> extends Thread {
|
||||||
private final int connectionTimeout;
|
private final int connectionTimeout;
|
||||||
private final int readTimeout;
|
private final int readTimeout;
|
||||||
|
|
||||||
// Decompression of map-outputs
|
|
||||||
private final CompressionCodec codec;
|
|
||||||
private final Decompressor decompressor;
|
|
||||||
private final SecretKey jobTokenSecret;
|
private final SecretKey jobTokenSecret;
|
||||||
|
|
||||||
private volatile boolean stopped = false;
|
private volatile boolean stopped = false;
|
||||||
|
|
||||||
private JobConf job;
|
|
||||||
|
|
||||||
private static boolean sslShuffle;
|
private static boolean sslShuffle;
|
||||||
private static SSLFactory sslFactory;
|
private static SSLFactory sslFactory;
|
||||||
|
|
||||||
|
@ -108,7 +93,6 @@ class Fetcher<K,V> extends Thread {
|
||||||
ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
|
ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
|
||||||
Reporter reporter, ShuffleClientMetrics metrics,
|
Reporter reporter, ShuffleClientMetrics metrics,
|
||||||
ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) {
|
ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) {
|
||||||
this.job = job;
|
|
||||||
this.reporter = reporter;
|
this.reporter = reporter;
|
||||||
this.scheduler = scheduler;
|
this.scheduler = scheduler;
|
||||||
this.merger = merger;
|
this.merger = merger;
|
||||||
|
@ -130,16 +114,6 @@ class Fetcher<K,V> extends Thread {
|
||||||
wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
|
wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
|
||||||
ShuffleErrors.WRONG_REDUCE.toString());
|
ShuffleErrors.WRONG_REDUCE.toString());
|
||||||
|
|
||||||
if (job.getCompressMapOutput()) {
|
|
||||||
Class<? extends CompressionCodec> codecClass =
|
|
||||||
job.getMapOutputCompressorClass(DefaultCodec.class);
|
|
||||||
codec = ReflectionUtils.newInstance(codecClass, job);
|
|
||||||
decompressor = CodecPool.getDecompressor(codec);
|
|
||||||
} else {
|
|
||||||
codec = null;
|
|
||||||
decompressor = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.connectionTimeout =
|
this.connectionTimeout =
|
||||||
job.getInt(MRJobConfig.SHUFFLE_CONNECT_TIMEOUT,
|
job.getInt(MRJobConfig.SHUFFLE_CONNECT_TIMEOUT,
|
||||||
DEFAULT_STALLED_COPY_TIMEOUT);
|
DEFAULT_STALLED_COPY_TIMEOUT);
|
||||||
|
@ -170,7 +144,7 @@ class Fetcher<K,V> extends Thread {
|
||||||
MapHost host = null;
|
MapHost host = null;
|
||||||
try {
|
try {
|
||||||
// If merge is on, block
|
// If merge is on, block
|
||||||
merger.waitForInMemoryMerge();
|
merger.waitForResource();
|
||||||
|
|
||||||
// Get a host to shuffle from
|
// Get a host to shuffle from
|
||||||
host = scheduler.getHost();
|
host = scheduler.getHost();
|
||||||
|
@ -386,8 +360,8 @@ class Fetcher<K,V> extends Thread {
|
||||||
mapOutput = merger.reserve(mapId, decompressedLength, id);
|
mapOutput = merger.reserve(mapId, decompressedLength, id);
|
||||||
|
|
||||||
// Check if we can shuffle *now* ...
|
// Check if we can shuffle *now* ...
|
||||||
if (mapOutput.getType() == Type.WAIT) {
|
if (mapOutput == null) {
|
||||||
LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
|
LOG.info("fetcher#" + id + " - MergeManager returned status WAIT ...");
|
||||||
//Not an error but wait to process data.
|
//Not an error but wait to process data.
|
||||||
return EMPTY_ATTEMPT_ID_ARRAY;
|
return EMPTY_ATTEMPT_ID_ARRAY;
|
||||||
}
|
}
|
||||||
|
@ -396,13 +370,9 @@ class Fetcher<K,V> extends Thread {
|
||||||
LOG.info("fetcher#" + id + " about to shuffle output of map " +
|
LOG.info("fetcher#" + id + " about to shuffle output of map " +
|
||||||
mapOutput.getMapId() + " decomp: " +
|
mapOutput.getMapId() + " decomp: " +
|
||||||
decompressedLength + " len: " + compressedLength + " to " +
|
decompressedLength + " len: " + compressedLength + " to " +
|
||||||
mapOutput.getType());
|
mapOutput.getDescription());
|
||||||
if (mapOutput.getType() == Type.MEMORY) {
|
mapOutput.shuffle(host, input, compressedLength, decompressedLength,
|
||||||
shuffleToMemory(host, mapOutput, input,
|
metrics, reporter);
|
||||||
(int) decompressedLength, (int) compressedLength);
|
|
||||||
} else {
|
|
||||||
shuffleToDisk(host, mapOutput, input, compressedLength);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Inform the shuffle scheduler
|
// Inform the shuffle scheduler
|
||||||
long endTime = System.currentTimeMillis();
|
long endTime = System.currentTimeMillis();
|
||||||
|
@ -538,84 +508,4 @@ class Fetcher<K,V> extends Thread {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void shuffleToMemory(MapHost host, MapOutput<K,V> mapOutput,
|
|
||||||
InputStream input,
|
|
||||||
int decompressedLength,
|
|
||||||
int compressedLength) throws IOException {
|
|
||||||
IFileInputStream checksumIn =
|
|
||||||
new IFileInputStream(input, compressedLength, job);
|
|
||||||
|
|
||||||
input = checksumIn;
|
|
||||||
|
|
||||||
// Are map-outputs compressed?
|
|
||||||
if (codec != null) {
|
|
||||||
decompressor.reset();
|
|
||||||
input = codec.createInputStream(input, decompressor);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Copy map-output into an in-memory buffer
|
|
||||||
byte[] shuffleData = mapOutput.getMemory();
|
|
||||||
|
|
||||||
try {
|
|
||||||
IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
|
|
||||||
metrics.inputBytes(shuffleData.length);
|
|
||||||
reporter.progress();
|
|
||||||
LOG.info("Read " + shuffleData.length + " bytes from map-output for " +
|
|
||||||
mapOutput.getMapId());
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
// Close the streams
|
|
||||||
IOUtils.cleanup(LOG, input);
|
|
||||||
|
|
||||||
// Re-throw
|
|
||||||
throw ioe;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private void shuffleToDisk(MapHost host, MapOutput<K,V> mapOutput,
|
|
||||||
InputStream input,
|
|
||||||
long compressedLength)
|
|
||||||
throws IOException {
|
|
||||||
// Copy data to local-disk
|
|
||||||
OutputStream output = mapOutput.getDisk();
|
|
||||||
long bytesLeft = compressedLength;
|
|
||||||
try {
|
|
||||||
final int BYTES_TO_READ = 64 * 1024;
|
|
||||||
byte[] buf = new byte[BYTES_TO_READ];
|
|
||||||
while (bytesLeft > 0) {
|
|
||||||
int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
|
|
||||||
if (n < 0) {
|
|
||||||
throw new IOException("read past end of stream reading " +
|
|
||||||
mapOutput.getMapId());
|
|
||||||
}
|
|
||||||
output.write(buf, 0, n);
|
|
||||||
bytesLeft -= n;
|
|
||||||
metrics.inputBytes(n);
|
|
||||||
reporter.progress();
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG.info("Read " + (compressedLength - bytesLeft) +
|
|
||||||
" bytes from map-output for " +
|
|
||||||
mapOutput.getMapId());
|
|
||||||
|
|
||||||
output.close();
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
// Close the streams
|
|
||||||
IOUtils.cleanup(LOG, input, output);
|
|
||||||
|
|
||||||
// Re-throw
|
|
||||||
throw ioe;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sanity check
|
|
||||||
if (bytesLeft != 0) {
|
|
||||||
throw new IOException("Incomplete map output received for " +
|
|
||||||
mapOutput.getMapId() + " from " +
|
|
||||||
host.getHostName() + " (" +
|
|
||||||
bytesLeft + " bytes missing of " +
|
|
||||||
compressedLength + ")"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,127 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce.task.reduce;
|
||||||
|
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.compress.CodecPool;
|
||||||
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
|
import org.apache.hadoop.io.compress.Decompressor;
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapred.IFileInputStream;
|
||||||
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
|
||||||
|
private static final Log LOG = LogFactory.getLog(InMemoryMapOutput.class);
|
||||||
|
private Configuration conf;
|
||||||
|
private final MergeManagerImpl<K, V> merger;
|
||||||
|
private final byte[] memory;
|
||||||
|
private BoundedByteArrayOutputStream byteStream;
|
||||||
|
// Decompression of map-outputs
|
||||||
|
private final CompressionCodec codec;
|
||||||
|
private final Decompressor decompressor;
|
||||||
|
|
||||||
|
public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId,
|
||||||
|
MergeManagerImpl<K, V> merger,
|
||||||
|
int size, CompressionCodec codec,
|
||||||
|
boolean primaryMapOutput) {
|
||||||
|
super(mapId, (long)size, primaryMapOutput);
|
||||||
|
this.conf = conf;
|
||||||
|
this.merger = merger;
|
||||||
|
this.codec = codec;
|
||||||
|
byteStream = new BoundedByteArrayOutputStream(size);
|
||||||
|
memory = byteStream.getBuffer();
|
||||||
|
if (codec != null) {
|
||||||
|
decompressor = CodecPool.getDecompressor(codec);
|
||||||
|
} else {
|
||||||
|
decompressor = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getMemory() {
|
||||||
|
return memory;
|
||||||
|
}
|
||||||
|
|
||||||
|
public BoundedByteArrayOutputStream getArrayStream() {
|
||||||
|
return byteStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shuffle(MapHost host, InputStream input,
|
||||||
|
long compressedLength, long decompressedLength,
|
||||||
|
ShuffleClientMetrics metrics,
|
||||||
|
Reporter reporter) throws IOException {
|
||||||
|
IFileInputStream checksumIn =
|
||||||
|
new IFileInputStream(input, compressedLength, conf);
|
||||||
|
|
||||||
|
input = checksumIn;
|
||||||
|
|
||||||
|
// Are map-outputs compressed?
|
||||||
|
if (codec != null) {
|
||||||
|
decompressor.reset();
|
||||||
|
input = codec.createInputStream(input, decompressor);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
IOUtils.readFully(input, memory, 0, memory.length);
|
||||||
|
metrics.inputBytes(memory.length);
|
||||||
|
reporter.progress();
|
||||||
|
LOG.info("Read " + memory.length + " bytes from map-output for " +
|
||||||
|
getMapId());
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// Close the streams
|
||||||
|
IOUtils.cleanup(LOG, input);
|
||||||
|
|
||||||
|
// Re-throw
|
||||||
|
throw ioe;
|
||||||
|
} finally {
|
||||||
|
CodecPool.returnDecompressor(decompressor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void commit() throws IOException {
|
||||||
|
merger.closeInMemoryFile(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void abort() {
|
||||||
|
merger.unreserve(memory.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDescription() {
|
||||||
|
return "MEMORY";
|
||||||
|
}
|
||||||
|
}
|
|
@ -35,12 +35,12 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public class InMemoryReader<K, V> extends Reader<K, V> {
|
public class InMemoryReader<K, V> extends Reader<K, V> {
|
||||||
private final TaskAttemptID taskAttemptId;
|
private final TaskAttemptID taskAttemptId;
|
||||||
private final MergeManager<K,V> merger;
|
private final MergeManagerImpl<K,V> merger;
|
||||||
DataInputBuffer memDataIn = new DataInputBuffer();
|
DataInputBuffer memDataIn = new DataInputBuffer();
|
||||||
private int start;
|
private int start;
|
||||||
private int length;
|
private int length;
|
||||||
|
|
||||||
public InMemoryReader(MergeManager<K,V> merger, TaskAttemptID taskAttemptId,
|
public InMemoryReader(MergeManagerImpl<K,V> merger, TaskAttemptID taskAttemptId,
|
||||||
byte[] data, int start, int length)
|
byte[] data, int start, int length)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
super(null, null, length - start, null, null);
|
super(null, null, length - start, null, null);
|
||||||
|
|
|
@ -17,119 +17,36 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.mapreduce.task.reduce;
|
package org.apache.hadoop.mapreduce.task.reduce;
|
||||||
|
|
||||||
|
import java.io.InputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
|
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
|
||||||
import org.apache.hadoop.mapred.MapOutputFile;
|
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
|
||||||
@InterfaceAudience.LimitedPrivate({"MapReduce"})
|
@InterfaceAudience.LimitedPrivate({"MapReduce"})
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public class MapOutput<K,V> {
|
public abstract class MapOutput<K, V> {
|
||||||
private static final Log LOG = LogFactory.getLog(MapOutput.class);
|
|
||||||
private static AtomicInteger ID = new AtomicInteger(0);
|
private static AtomicInteger ID = new AtomicInteger(0);
|
||||||
|
|
||||||
public static enum Type {
|
|
||||||
WAIT,
|
|
||||||
MEMORY,
|
|
||||||
DISK
|
|
||||||
}
|
|
||||||
|
|
||||||
private final int id;
|
private final int id;
|
||||||
|
|
||||||
private final MergeManager<K,V> merger;
|
|
||||||
private final TaskAttemptID mapId;
|
private final TaskAttemptID mapId;
|
||||||
|
|
||||||
private final long size;
|
private final long size;
|
||||||
|
|
||||||
private final byte[] memory;
|
|
||||||
private BoundedByteArrayOutputStream byteStream;
|
|
||||||
|
|
||||||
private final FileSystem localFS;
|
|
||||||
private final Path tmpOutputPath;
|
|
||||||
private final Path outputPath;
|
|
||||||
private final OutputStream disk;
|
|
||||||
|
|
||||||
private final Type type;
|
|
||||||
|
|
||||||
private final boolean primaryMapOutput;
|
private final boolean primaryMapOutput;
|
||||||
|
|
||||||
public MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, long size,
|
public MapOutput(TaskAttemptID mapId, long size, boolean primaryMapOutput) {
|
||||||
JobConf conf, LocalDirAllocator localDirAllocator,
|
|
||||||
int fetcher, boolean primaryMapOutput, MapOutputFile mapOutputFile)
|
|
||||||
throws IOException {
|
|
||||||
this.id = ID.incrementAndGet();
|
this.id = ID.incrementAndGet();
|
||||||
this.mapId = mapId;
|
this.mapId = mapId;
|
||||||
this.merger = merger;
|
|
||||||
|
|
||||||
type = Type.DISK;
|
|
||||||
|
|
||||||
memory = null;
|
|
||||||
byteStream = null;
|
|
||||||
|
|
||||||
this.size = size;
|
this.size = size;
|
||||||
|
|
||||||
this.localFS = FileSystem.getLocal(conf);
|
|
||||||
outputPath =
|
|
||||||
mapOutputFile.getInputFileForWrite(mapId.getTaskID(),size);
|
|
||||||
tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
|
|
||||||
|
|
||||||
disk = localFS.create(tmpOutputPath);
|
|
||||||
|
|
||||||
this.primaryMapOutput = primaryMapOutput;
|
this.primaryMapOutput = primaryMapOutput;
|
||||||
}
|
}
|
||||||
|
|
||||||
public MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, int size,
|
|
||||||
boolean primaryMapOutput) {
|
|
||||||
this.id = ID.incrementAndGet();
|
|
||||||
this.mapId = mapId;
|
|
||||||
this.merger = merger;
|
|
||||||
|
|
||||||
type = Type.MEMORY;
|
|
||||||
byteStream = new BoundedByteArrayOutputStream(size);
|
|
||||||
memory = byteStream.getBuffer();
|
|
||||||
|
|
||||||
this.size = size;
|
|
||||||
|
|
||||||
localFS = null;
|
|
||||||
disk = null;
|
|
||||||
outputPath = null;
|
|
||||||
tmpOutputPath = null;
|
|
||||||
|
|
||||||
this.primaryMapOutput = primaryMapOutput;
|
|
||||||
}
|
|
||||||
|
|
||||||
public MapOutput(TaskAttemptID mapId) {
|
|
||||||
this.id = ID.incrementAndGet();
|
|
||||||
this.mapId = mapId;
|
|
||||||
|
|
||||||
type = Type.WAIT;
|
|
||||||
merger = null;
|
|
||||||
memory = null;
|
|
||||||
byteStream = null;
|
|
||||||
|
|
||||||
size = -1;
|
|
||||||
|
|
||||||
localFS = null;
|
|
||||||
disk = null;
|
|
||||||
outputPath = null;
|
|
||||||
tmpOutputPath = null;
|
|
||||||
|
|
||||||
this.primaryMapOutput = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isPrimaryMapOutput() {
|
public boolean isPrimaryMapOutput() {
|
||||||
return primaryMapOutput;
|
return primaryMapOutput;
|
||||||
}
|
}
|
||||||
|
@ -147,62 +64,28 @@ public class MapOutput<K,V> {
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Path getOutputPath() {
|
|
||||||
return outputPath;
|
|
||||||
}
|
|
||||||
|
|
||||||
public byte[] getMemory() {
|
|
||||||
return memory;
|
|
||||||
}
|
|
||||||
|
|
||||||
public BoundedByteArrayOutputStream getArrayStream() {
|
|
||||||
return byteStream;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OutputStream getDisk() {
|
|
||||||
return disk;
|
|
||||||
}
|
|
||||||
|
|
||||||
public TaskAttemptID getMapId() {
|
public TaskAttemptID getMapId() {
|
||||||
return mapId;
|
return mapId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Type getType() {
|
|
||||||
return type;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getSize() {
|
public long getSize() {
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void commit() throws IOException {
|
public abstract void shuffle(MapHost host, InputStream input,
|
||||||
if (type == Type.MEMORY) {
|
long compressedLength,
|
||||||
merger.closeInMemoryFile(this);
|
long decompressedLength,
|
||||||
} else if (type == Type.DISK) {
|
ShuffleClientMetrics metrics,
|
||||||
localFS.rename(tmpOutputPath, outputPath);
|
Reporter reporter) throws IOException;
|
||||||
merger.closeOnDiskFile(outputPath);
|
|
||||||
} else {
|
public abstract void commit() throws IOException;
|
||||||
throw new IOException("Cannot commit MapOutput of type WAIT!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void abort() {
|
|
||||||
if (type == Type.MEMORY) {
|
|
||||||
merger.unreserve(memory.length);
|
|
||||||
} else if (type == Type.DISK) {
|
|
||||||
try {
|
|
||||||
localFS.delete(tmpOutputPath, false);
|
|
||||||
} catch (IOException ie) {
|
|
||||||
LOG.info("failure to clean up " + tmpOutputPath, ie);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
throw new IllegalArgumentException
|
|
||||||
("Cannot commit MapOutput with of type WAIT!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
public abstract void abort();
|
||||||
|
|
||||||
|
public abstract String getDescription();
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "MapOutput(" + mapId + ", " + type + ")";
|
return "MapOutput(" + mapId + ", " + getDescription() + ")";
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class MapOutputComparator<K, V>
|
public static class MapOutputComparator<K, V>
|
||||||
|
|
|
@ -15,783 +15,56 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.mapreduce.task.reduce;
|
package org.apache.hadoop.mapreduce.task.reduce;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.TreeSet;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.ChecksumFileSystem;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||||
import org.apache.hadoop.fs.LocalFileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
|
||||||
import org.apache.hadoop.io.RawComparator;
|
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
import org.apache.hadoop.mapred.Counters;
|
import org.apache.hadoop.mapred.Counters;
|
||||||
import org.apache.hadoop.mapred.IFile;
|
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.MapOutputFile;
|
import org.apache.hadoop.mapred.MapOutputFile;
|
||||||
import org.apache.hadoop.mapred.Merger;
|
|
||||||
import org.apache.hadoop.mapred.RawKeyValueIterator;
|
import org.apache.hadoop.mapred.RawKeyValueIterator;
|
||||||
import org.apache.hadoop.mapred.Reducer;
|
import org.apache.hadoop.mapred.Reducer;
|
||||||
import org.apache.hadoop.mapred.Reporter;
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
import org.apache.hadoop.mapred.Task;
|
|
||||||
import org.apache.hadoop.mapred.IFile.Reader;
|
|
||||||
import org.apache.hadoop.mapred.IFile.Writer;
|
|
||||||
import org.apache.hadoop.mapred.Merger.Segment;
|
|
||||||
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
|
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
|
||||||
import org.apache.hadoop.mapred.Task.CombineValuesIterator;
|
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapreduce.TaskID;
|
|
||||||
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator;
|
|
||||||
import org.apache.hadoop.util.Progress;
|
import org.apache.hadoop.util.Progress;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import java.io.IOException;
|
||||||
|
|
||||||
@SuppressWarnings(value={"unchecked", "deprecation"})
|
/**
|
||||||
@InterfaceAudience.LimitedPrivate({"MapReduce"})
|
* An interface for a reduce side merge that works with the default Shuffle
|
||||||
|
* implementation.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public class MergeManager<K, V> {
|
public interface MergeManager<K, V> {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(MergeManager.class);
|
|
||||||
|
|
||||||
/* Maximum percentage of the in-memory limit that a single shuffle can
|
|
||||||
* consume*/
|
|
||||||
private static final float DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT
|
|
||||||
= 0.25f;
|
|
||||||
|
|
||||||
private final TaskAttemptID reduceId;
|
|
||||||
|
|
||||||
private final JobConf jobConf;
|
|
||||||
private final FileSystem localFS;
|
|
||||||
private final FileSystem rfs;
|
|
||||||
private final LocalDirAllocator localDirAllocator;
|
|
||||||
|
|
||||||
protected MapOutputFile mapOutputFile;
|
|
||||||
|
|
||||||
Set<MapOutput<K, V>> inMemoryMergedMapOutputs =
|
|
||||||
new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>());
|
|
||||||
private final IntermediateMemoryToMemoryMerger memToMemMerger;
|
|
||||||
|
|
||||||
Set<MapOutput<K, V>> inMemoryMapOutputs =
|
|
||||||
new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>());
|
|
||||||
private final MergeThread<MapOutput<K,V>, K,V> inMemoryMerger;
|
|
||||||
|
|
||||||
Set<Path> onDiskMapOutputs = new TreeSet<Path>();
|
|
||||||
private final OnDiskMerger onDiskMerger;
|
|
||||||
|
|
||||||
private final long memoryLimit;
|
|
||||||
private long usedMemory;
|
|
||||||
private long commitMemory;
|
|
||||||
private final long maxSingleShuffleLimit;
|
|
||||||
|
|
||||||
private final int memToMemMergeOutputsThreshold;
|
|
||||||
private final long mergeThreshold;
|
|
||||||
|
|
||||||
private final int ioSortFactor;
|
|
||||||
|
|
||||||
private final Reporter reporter;
|
|
||||||
private final ExceptionReporter exceptionReporter;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Combiner class to run during in-memory merge, if defined.
|
* To wait until merge has some freed resources available so that it can
|
||||||
|
* accept shuffled data. This will be called before a network connection is
|
||||||
|
* established to get the map output.
|
||||||
*/
|
*/
|
||||||
private final Class<? extends Reducer> combinerClass;
|
public void waitForResource() throws InterruptedException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Resettable collector used for combine.
|
* To reserve resources for data to be shuffled. This will be called after
|
||||||
|
* a network connection is made to shuffle the data.
|
||||||
|
* @param mapId mapper from which data will be shuffled.
|
||||||
|
* @param requestedSize size in bytes of data that will be shuffled.
|
||||||
|
* @param fetcher id of the map output fetcher that will shuffle the data.
|
||||||
|
* @return a MapOutput object that can be used by shuffle to shuffle data. If
|
||||||
|
* required resources cannot be reserved immediately, a null can be returned.
|
||||||
*/
|
*/
|
||||||
private final CombineOutputCollector<K,V> combineCollector;
|
public MapOutput<K, V> reserve(TaskAttemptID mapId, long requestedSize,
|
||||||
|
int fetcher) throws IOException;
|
||||||
|
|
||||||
private final Counters.Counter spilledRecordsCounter;
|
|
||||||
|
|
||||||
private final Counters.Counter reduceCombineInputCounter;
|
|
||||||
|
|
||||||
private final Counters.Counter mergedMapOutputsCounter;
|
|
||||||
|
|
||||||
private final CompressionCodec codec;
|
|
||||||
|
|
||||||
private final Progress mergePhase;
|
|
||||||
|
|
||||||
public MergeManager(TaskAttemptID reduceId, JobConf jobConf,
|
|
||||||
FileSystem localFS,
|
|
||||||
LocalDirAllocator localDirAllocator,
|
|
||||||
Reporter reporter,
|
|
||||||
CompressionCodec codec,
|
|
||||||
Class<? extends Reducer> combinerClass,
|
|
||||||
CombineOutputCollector<K,V> combineCollector,
|
|
||||||
Counters.Counter spilledRecordsCounter,
|
|
||||||
Counters.Counter reduceCombineInputCounter,
|
|
||||||
Counters.Counter mergedMapOutputsCounter,
|
|
||||||
ExceptionReporter exceptionReporter,
|
|
||||||
Progress mergePhase, MapOutputFile mapOutputFile) {
|
|
||||||
this.reduceId = reduceId;
|
|
||||||
this.jobConf = jobConf;
|
|
||||||
this.localDirAllocator = localDirAllocator;
|
|
||||||
this.exceptionReporter = exceptionReporter;
|
|
||||||
|
|
||||||
this.reporter = reporter;
|
|
||||||
this.codec = codec;
|
|
||||||
this.combinerClass = combinerClass;
|
|
||||||
this.combineCollector = combineCollector;
|
|
||||||
this.reduceCombineInputCounter = reduceCombineInputCounter;
|
|
||||||
this.spilledRecordsCounter = spilledRecordsCounter;
|
|
||||||
this.mergedMapOutputsCounter = mergedMapOutputsCounter;
|
|
||||||
this.mapOutputFile = mapOutputFile;
|
|
||||||
this.mapOutputFile.setConf(jobConf);
|
|
||||||
|
|
||||||
this.localFS = localFS;
|
|
||||||
this.rfs = ((LocalFileSystem)localFS).getRaw();
|
|
||||||
|
|
||||||
final float maxInMemCopyUse =
|
|
||||||
jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 0.90f);
|
|
||||||
if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
|
|
||||||
throw new IllegalArgumentException("Invalid value for " +
|
|
||||||
MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": " +
|
|
||||||
maxInMemCopyUse);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Allow unit tests to fix Runtime memory
|
|
||||||
this.memoryLimit =
|
|
||||||
(long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
|
|
||||||
Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
|
|
||||||
* maxInMemCopyUse);
|
|
||||||
|
|
||||||
this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
|
|
||||||
|
|
||||||
final float singleShuffleMemoryLimitPercent =
|
|
||||||
jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT,
|
|
||||||
DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT);
|
|
||||||
if (singleShuffleMemoryLimitPercent <= 0.0f
|
|
||||||
|| singleShuffleMemoryLimitPercent > 1.0f) {
|
|
||||||
throw new IllegalArgumentException("Invalid value for "
|
|
||||||
+ MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
|
|
||||||
+ singleShuffleMemoryLimitPercent);
|
|
||||||
}
|
|
||||||
|
|
||||||
usedMemory = 0L;
|
|
||||||
commitMemory = 0L;
|
|
||||||
this.maxSingleShuffleLimit =
|
|
||||||
(long)(memoryLimit * singleShuffleMemoryLimitPercent);
|
|
||||||
this.memToMemMergeOutputsThreshold =
|
|
||||||
jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
|
|
||||||
this.mergeThreshold = (long)(this.memoryLimit *
|
|
||||||
jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT,
|
|
||||||
0.90f));
|
|
||||||
LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
|
|
||||||
"maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
|
|
||||||
"mergeThreshold=" + mergeThreshold + ", " +
|
|
||||||
"ioSortFactor=" + ioSortFactor + ", " +
|
|
||||||
"memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
|
|
||||||
|
|
||||||
if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
|
|
||||||
throw new RuntimeException("Invlaid configuration: "
|
|
||||||
+ "maxSingleShuffleLimit should be less than mergeThreshold"
|
|
||||||
+ "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
|
|
||||||
+ "mergeThreshold: " + this.mergeThreshold);
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean allowMemToMemMerge =
|
|
||||||
jobConf.getBoolean(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, false);
|
|
||||||
if (allowMemToMemMerge) {
|
|
||||||
this.memToMemMerger =
|
|
||||||
new IntermediateMemoryToMemoryMerger(this,
|
|
||||||
memToMemMergeOutputsThreshold);
|
|
||||||
this.memToMemMerger.start();
|
|
||||||
} else {
|
|
||||||
this.memToMemMerger = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.inMemoryMerger = createInMemoryMerger();
|
|
||||||
this.inMemoryMerger.start();
|
|
||||||
|
|
||||||
this.onDiskMerger = new OnDiskMerger(this);
|
|
||||||
this.onDiskMerger.start();
|
|
||||||
|
|
||||||
this.mergePhase = mergePhase;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected MergeThread<MapOutput<K,V>, K,V> createInMemoryMerger() {
|
|
||||||
return new InMemoryMerger(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
TaskAttemptID getReduceId() {
|
|
||||||
return reduceId;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
ExceptionReporter getExceptionReporter() {
|
|
||||||
return exceptionReporter;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void waitForInMemoryMerge() throws InterruptedException {
|
|
||||||
inMemoryMerger.waitForMerge();
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean canShuffleToMemory(long requestedSize) {
|
|
||||||
return (requestedSize < maxSingleShuffleLimit);
|
|
||||||
}
|
|
||||||
|
|
||||||
final private MapOutput<K,V> stallShuffle = new MapOutput<K,V>(null);
|
|
||||||
|
|
||||||
public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId,
|
|
||||||
long requestedSize,
|
|
||||||
int fetcher
|
|
||||||
) throws IOException {
|
|
||||||
if (!canShuffleToMemory(requestedSize)) {
|
|
||||||
LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
|
|
||||||
" is greater than maxSingleShuffleLimit (" +
|
|
||||||
maxSingleShuffleLimit + ")");
|
|
||||||
return new MapOutput<K,V>(mapId, this, requestedSize, jobConf,
|
|
||||||
localDirAllocator, fetcher, true,
|
|
||||||
mapOutputFile);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stall shuffle if we are above the memory limit
|
|
||||||
|
|
||||||
// It is possible that all threads could just be stalling and not make
|
|
||||||
// progress at all. This could happen when:
|
|
||||||
//
|
|
||||||
// requested size is causing the used memory to go above limit &&
|
|
||||||
// requested size < singleShuffleLimit &&
|
|
||||||
// current used size < mergeThreshold (merge will not get triggered)
|
|
||||||
//
|
|
||||||
// To avoid this from happening, we allow exactly one thread to go past
|
|
||||||
// the memory limit. We check (usedMemory > memoryLimit) and not
|
|
||||||
// (usedMemory + requestedSize > memoryLimit). When this thread is done
|
|
||||||
// fetching, this will automatically trigger a merge thereby unlocking
|
|
||||||
// all the stalled threads
|
|
||||||
|
|
||||||
if (usedMemory > memoryLimit) {
|
|
||||||
LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
|
|
||||||
+ ") is greater than memoryLimit (" + memoryLimit + ")." +
|
|
||||||
" CommitMemory is (" + commitMemory + ")");
|
|
||||||
return stallShuffle;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Allow the in-memory shuffle to progress
|
|
||||||
LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
|
|
||||||
+ usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
|
|
||||||
+ "CommitMemory is (" + commitMemory + ")");
|
|
||||||
return unconditionalReserve(mapId, requestedSize, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unconditional Reserve is used by the Memory-to-Memory thread
|
* Called at the end of shuffle.
|
||||||
* @return
|
* @return a key value iterator object.
|
||||||
*/
|
*/
|
||||||
private synchronized MapOutput<K, V> unconditionalReserve(
|
public RawKeyValueIterator close() throws Throwable;
|
||||||
TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
|
|
||||||
usedMemory += requestedSize;
|
|
||||||
return new MapOutput<K,V>(mapId, this, (int)requestedSize,
|
|
||||||
primaryMapOutput);
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized void unreserve(long size) {
|
|
||||||
usedMemory -= size;
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void closeInMemoryFile(MapOutput<K,V> mapOutput) {
|
|
||||||
inMemoryMapOutputs.add(mapOutput);
|
|
||||||
LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
|
|
||||||
+ ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
|
|
||||||
+ ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);
|
|
||||||
|
|
||||||
commitMemory+= mapOutput.getSize();
|
|
||||||
|
|
||||||
// Can hang if mergeThreshold is really low.
|
|
||||||
if (commitMemory >= mergeThreshold) {
|
|
||||||
LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
|
|
||||||
commitMemory + " > mergeThreshold=" + mergeThreshold +
|
|
||||||
". Current usedMemory=" + usedMemory);
|
|
||||||
inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
|
|
||||||
inMemoryMergedMapOutputs.clear();
|
|
||||||
inMemoryMerger.startMerge(inMemoryMapOutputs);
|
|
||||||
commitMemory = 0L; // Reset commitMemory.
|
|
||||||
}
|
|
||||||
|
|
||||||
if (memToMemMerger != null) {
|
|
||||||
if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
|
|
||||||
memToMemMerger.startMerge(inMemoryMapOutputs);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public synchronized void closeInMemoryMergedFile(MapOutput<K,V> mapOutput) {
|
|
||||||
inMemoryMergedMapOutputs.add(mapOutput);
|
|
||||||
LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() +
|
|
||||||
", inMemoryMergedMapOutputs.size() -> " +
|
|
||||||
inMemoryMergedMapOutputs.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void closeOnDiskFile(Path file) {
|
|
||||||
onDiskMapOutputs.add(file);
|
|
||||||
|
|
||||||
if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
|
|
||||||
onDiskMerger.startMerge(onDiskMapOutputs);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public RawKeyValueIterator close() throws Throwable {
|
|
||||||
// Wait for on-going merges to complete
|
|
||||||
if (memToMemMerger != null) {
|
|
||||||
memToMemMerger.close();
|
|
||||||
}
|
|
||||||
inMemoryMerger.close();
|
|
||||||
onDiskMerger.close();
|
|
||||||
|
|
||||||
List<MapOutput<K, V>> memory =
|
|
||||||
new ArrayList<MapOutput<K, V>>(inMemoryMergedMapOutputs);
|
|
||||||
memory.addAll(inMemoryMapOutputs);
|
|
||||||
List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
|
|
||||||
return finalMerge(jobConf, rfs, memory, disk);
|
|
||||||
}
|
|
||||||
|
|
||||||
private class IntermediateMemoryToMemoryMerger
|
|
||||||
extends MergeThread<MapOutput<K, V>, K, V> {
|
|
||||||
|
|
||||||
public IntermediateMemoryToMemoryMerger(MergeManager<K, V> manager,
|
|
||||||
int mergeFactor) {
|
|
||||||
super(manager, mergeFactor, exceptionReporter);
|
|
||||||
setName("InMemoryMerger - Thread to do in-memory merge of in-memory " +
|
|
||||||
"shuffled map-outputs");
|
|
||||||
setDaemon(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void merge(List<MapOutput<K, V>> inputs) throws IOException {
|
|
||||||
if (inputs == null || inputs.size() == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
TaskAttemptID dummyMapId = inputs.get(0).getMapId();
|
|
||||||
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
|
|
||||||
long mergeOutputSize =
|
|
||||||
createInMemorySegments(inputs, inMemorySegments, 0);
|
|
||||||
int noInMemorySegments = inMemorySegments.size();
|
|
||||||
|
|
||||||
MapOutput<K, V> mergedMapOutputs =
|
|
||||||
unconditionalReserve(dummyMapId, mergeOutputSize, false);
|
|
||||||
|
|
||||||
Writer<K, V> writer =
|
|
||||||
new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
|
|
||||||
|
|
||||||
LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
|
|
||||||
" segments of total-size: " + mergeOutputSize);
|
|
||||||
|
|
||||||
RawKeyValueIterator rIter =
|
|
||||||
Merger.merge(jobConf, rfs,
|
|
||||||
(Class<K>)jobConf.getMapOutputKeyClass(),
|
|
||||||
(Class<V>)jobConf.getMapOutputValueClass(),
|
|
||||||
inMemorySegments, inMemorySegments.size(),
|
|
||||||
new Path(reduceId.toString()),
|
|
||||||
(RawComparator<K>)jobConf.getOutputKeyComparator(),
|
|
||||||
reporter, null, null, null);
|
|
||||||
Merger.writeFile(rIter, writer, reporter, jobConf);
|
|
||||||
writer.close();
|
|
||||||
|
|
||||||
LOG.info(reduceId +
|
|
||||||
" Memory-to-Memory merge of the " + noInMemorySegments +
|
|
||||||
" files in-memory complete.");
|
|
||||||
|
|
||||||
// Note the output of the merge
|
|
||||||
closeInMemoryMergedFile(mergedMapOutputs);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class InMemoryMerger extends MergeThread<MapOutput<K,V>, K,V> {
|
|
||||||
|
|
||||||
public InMemoryMerger(MergeManager<K, V> manager) {
|
|
||||||
super(manager, Integer.MAX_VALUE, exceptionReporter);
|
|
||||||
setName
|
|
||||||
("InMemoryMerger - Thread to merge in-memory shuffled map-outputs");
|
|
||||||
setDaemon(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void merge(List<MapOutput<K,V>> inputs) throws IOException {
|
|
||||||
if (inputs == null || inputs.size() == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
//name this output file same as the name of the first file that is
|
|
||||||
//there in the current list of inmem files (this is guaranteed to
|
|
||||||
//be absent on the disk currently. So we don't overwrite a prev.
|
|
||||||
//created spill). Also we need to create the output file now since
|
|
||||||
//it is not guaranteed that this file will be present after merge
|
|
||||||
//is called (we delete empty files as soon as we see them
|
|
||||||
//in the merge method)
|
|
||||||
|
|
||||||
//figure out the mapId
|
|
||||||
TaskAttemptID mapId = inputs.get(0).getMapId();
|
|
||||||
TaskID mapTaskId = mapId.getTaskID();
|
|
||||||
|
|
||||||
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
|
|
||||||
long mergeOutputSize =
|
|
||||||
createInMemorySegments(inputs, inMemorySegments,0);
|
|
||||||
int noInMemorySegments = inMemorySegments.size();
|
|
||||||
|
|
||||||
Path outputPath =
|
|
||||||
mapOutputFile.getInputFileForWrite(mapTaskId,
|
|
||||||
mergeOutputSize).suffix(
|
|
||||||
Task.MERGED_OUTPUT_PREFIX);
|
|
||||||
|
|
||||||
Writer<K,V> writer =
|
|
||||||
new Writer<K,V>(jobConf, rfs, outputPath,
|
|
||||||
(Class<K>) jobConf.getMapOutputKeyClass(),
|
|
||||||
(Class<V>) jobConf.getMapOutputValueClass(),
|
|
||||||
codec, null);
|
|
||||||
|
|
||||||
RawKeyValueIterator rIter = null;
|
|
||||||
try {
|
|
||||||
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
|
|
||||||
" segments...");
|
|
||||||
|
|
||||||
rIter = Merger.merge(jobConf, rfs,
|
|
||||||
(Class<K>)jobConf.getMapOutputKeyClass(),
|
|
||||||
(Class<V>)jobConf.getMapOutputValueClass(),
|
|
||||||
inMemorySegments, inMemorySegments.size(),
|
|
||||||
new Path(reduceId.toString()),
|
|
||||||
(RawComparator<K>)jobConf.getOutputKeyComparator(),
|
|
||||||
reporter, spilledRecordsCounter, null, null);
|
|
||||||
|
|
||||||
if (null == combinerClass) {
|
|
||||||
Merger.writeFile(rIter, writer, reporter, jobConf);
|
|
||||||
} else {
|
|
||||||
combineCollector.setWriter(writer);
|
|
||||||
combineAndSpill(rIter, reduceCombineInputCounter);
|
|
||||||
}
|
|
||||||
writer.close();
|
|
||||||
|
|
||||||
LOG.info(reduceId +
|
|
||||||
" Merge of the " + noInMemorySegments +
|
|
||||||
" files in-memory complete." +
|
|
||||||
" Local file is " + outputPath + " of size " +
|
|
||||||
localFS.getFileStatus(outputPath).getLen());
|
|
||||||
} catch (IOException e) {
|
|
||||||
//make sure that we delete the ondisk file that we created
|
|
||||||
//earlier when we invoked cloneFileAttributes
|
|
||||||
localFS.delete(outputPath, true);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Note the output of the merge
|
|
||||||
closeOnDiskFile(outputPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private class OnDiskMerger extends MergeThread<Path,K,V> {
|
|
||||||
|
|
||||||
public OnDiskMerger(MergeManager<K, V> manager) {
|
|
||||||
super(manager, Integer.MAX_VALUE, exceptionReporter);
|
|
||||||
setName("OnDiskMerger - Thread to merge on-disk map-outputs");
|
|
||||||
setDaemon(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void merge(List<Path> inputs) throws IOException {
|
|
||||||
// sanity check
|
|
||||||
if (inputs == null || inputs.isEmpty()) {
|
|
||||||
LOG.info("No ondisk files to merge...");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
long approxOutputSize = 0;
|
|
||||||
int bytesPerSum =
|
|
||||||
jobConf.getInt("io.bytes.per.checksum", 512);
|
|
||||||
|
|
||||||
LOG.info("OnDiskMerger: We have " + inputs.size() +
|
|
||||||
" map outputs on disk. Triggering merge...");
|
|
||||||
|
|
||||||
// 1. Prepare the list of files to be merged.
|
|
||||||
for (Path file : inputs) {
|
|
||||||
approxOutputSize += localFS.getFileStatus(file).getLen();
|
|
||||||
}
|
|
||||||
|
|
||||||
// add the checksum length
|
|
||||||
approxOutputSize +=
|
|
||||||
ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
|
|
||||||
|
|
||||||
// 2. Start the on-disk merge process
|
|
||||||
Path outputPath =
|
|
||||||
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
|
|
||||||
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
|
|
||||||
Writer<K,V> writer =
|
|
||||||
new Writer<K,V>(jobConf, rfs, outputPath,
|
|
||||||
(Class<K>) jobConf.getMapOutputKeyClass(),
|
|
||||||
(Class<V>) jobConf.getMapOutputValueClass(),
|
|
||||||
codec, null);
|
|
||||||
RawKeyValueIterator iter = null;
|
|
||||||
Path tmpDir = new Path(reduceId.toString());
|
|
||||||
try {
|
|
||||||
iter = Merger.merge(jobConf, rfs,
|
|
||||||
(Class<K>) jobConf.getMapOutputKeyClass(),
|
|
||||||
(Class<V>) jobConf.getMapOutputValueClass(),
|
|
||||||
codec, inputs.toArray(new Path[inputs.size()]),
|
|
||||||
true, ioSortFactor, tmpDir,
|
|
||||||
(RawComparator<K>) jobConf.getOutputKeyComparator(),
|
|
||||||
reporter, spilledRecordsCounter, null,
|
|
||||||
mergedMapOutputsCounter, null);
|
|
||||||
|
|
||||||
Merger.writeFile(iter, writer, reporter, jobConf);
|
|
||||||
writer.close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
localFS.delete(outputPath, true);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
|
|
||||||
closeOnDiskFile(outputPath);
|
|
||||||
|
|
||||||
LOG.info(reduceId +
|
|
||||||
" Finished merging " + inputs.size() +
|
|
||||||
" map output files on disk of total-size " +
|
|
||||||
approxOutputSize + "." +
|
|
||||||
" Local output file is " + outputPath + " of size " +
|
|
||||||
localFS.getFileStatus(outputPath).getLen());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void combineAndSpill(
|
|
||||||
RawKeyValueIterator kvIter,
|
|
||||||
Counters.Counter inCounter) throws IOException {
|
|
||||||
JobConf job = jobConf;
|
|
||||||
Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
|
|
||||||
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
|
|
||||||
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
|
|
||||||
RawComparator<K> comparator =
|
|
||||||
(RawComparator<K>)job.getOutputKeyComparator();
|
|
||||||
try {
|
|
||||||
CombineValuesIterator values = new CombineValuesIterator(
|
|
||||||
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
|
|
||||||
inCounter);
|
|
||||||
while (values.more()) {
|
|
||||||
combiner.reduce(values.getKey(), values, combineCollector,
|
|
||||||
Reporter.NULL);
|
|
||||||
values.nextKey();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
combiner.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private long createInMemorySegments(List<MapOutput<K,V>> inMemoryMapOutputs,
|
|
||||||
List<Segment<K, V>> inMemorySegments,
|
|
||||||
long leaveBytes
|
|
||||||
) throws IOException {
|
|
||||||
long totalSize = 0L;
|
|
||||||
// We could use fullSize could come from the RamManager, but files can be
|
|
||||||
// closed but not yet present in inMemoryMapOutputs
|
|
||||||
long fullSize = 0L;
|
|
||||||
for (MapOutput<K,V> mo : inMemoryMapOutputs) {
|
|
||||||
fullSize += mo.getMemory().length;
|
|
||||||
}
|
|
||||||
while(fullSize > leaveBytes) {
|
|
||||||
MapOutput<K,V> mo = inMemoryMapOutputs.remove(0);
|
|
||||||
byte[] data = mo.getMemory();
|
|
||||||
long size = data.length;
|
|
||||||
totalSize += size;
|
|
||||||
fullSize -= size;
|
|
||||||
Reader<K,V> reader = new InMemoryReader<K,V>(MergeManager.this,
|
|
||||||
mo.getMapId(),
|
|
||||||
data, 0, (int)size);
|
|
||||||
inMemorySegments.add(new Segment<K,V>(reader, true,
|
|
||||||
(mo.isPrimaryMapOutput() ?
|
|
||||||
mergedMapOutputsCounter : null)));
|
|
||||||
}
|
|
||||||
return totalSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
class RawKVIteratorReader extends IFile.Reader<K,V> {
|
|
||||||
|
|
||||||
private final RawKeyValueIterator kvIter;
|
|
||||||
|
|
||||||
public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
|
|
||||||
throws IOException {
|
|
||||||
super(null, null, size, null, spilledRecordsCounter);
|
|
||||||
this.kvIter = kvIter;
|
|
||||||
}
|
|
||||||
public boolean nextRawKey(DataInputBuffer key) throws IOException {
|
|
||||||
if (kvIter.next()) {
|
|
||||||
final DataInputBuffer kb = kvIter.getKey();
|
|
||||||
final int kp = kb.getPosition();
|
|
||||||
final int klen = kb.getLength() - kp;
|
|
||||||
key.reset(kb.getData(), kp, klen);
|
|
||||||
bytesRead += klen;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
public void nextRawValue(DataInputBuffer value) throws IOException {
|
|
||||||
final DataInputBuffer vb = kvIter.getValue();
|
|
||||||
final int vp = vb.getPosition();
|
|
||||||
final int vlen = vb.getLength() - vp;
|
|
||||||
value.reset(vb.getData(), vp, vlen);
|
|
||||||
bytesRead += vlen;
|
|
||||||
}
|
|
||||||
public long getPosition() throws IOException {
|
|
||||||
return bytesRead;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void close() throws IOException {
|
|
||||||
kvIter.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
|
|
||||||
List<MapOutput<K,V>> inMemoryMapOutputs,
|
|
||||||
List<Path> onDiskMapOutputs
|
|
||||||
) throws IOException {
|
|
||||||
LOG.info("finalMerge called with " +
|
|
||||||
inMemoryMapOutputs.size() + " in-memory map-outputs and " +
|
|
||||||
onDiskMapOutputs.size() + " on-disk map-outputs");
|
|
||||||
|
|
||||||
final float maxRedPer =
|
|
||||||
job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
|
|
||||||
if (maxRedPer > 1.0 || maxRedPer < 0.0) {
|
|
||||||
throw new IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT +
|
|
||||||
maxRedPer);
|
|
||||||
}
|
|
||||||
int maxInMemReduce = (int)Math.min(
|
|
||||||
Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
|
|
||||||
|
|
||||||
|
|
||||||
// merge config params
|
|
||||||
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
|
|
||||||
Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
|
|
||||||
boolean keepInputs = job.getKeepFailedTaskFiles();
|
|
||||||
final Path tmpDir = new Path(reduceId.toString());
|
|
||||||
final RawComparator<K> comparator =
|
|
||||||
(RawComparator<K>)job.getOutputKeyComparator();
|
|
||||||
|
|
||||||
// segments required to vacate memory
|
|
||||||
List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
|
|
||||||
long inMemToDiskBytes = 0;
|
|
||||||
boolean mergePhaseFinished = false;
|
|
||||||
if (inMemoryMapOutputs.size() > 0) {
|
|
||||||
TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
|
|
||||||
inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs,
|
|
||||||
memDiskSegments,
|
|
||||||
maxInMemReduce);
|
|
||||||
final int numMemDiskSegments = memDiskSegments.size();
|
|
||||||
if (numMemDiskSegments > 0 &&
|
|
||||||
ioSortFactor > onDiskMapOutputs.size()) {
|
|
||||||
|
|
||||||
// If we reach here, it implies that we have less than io.sort.factor
|
|
||||||
// disk segments and this will be incremented by 1 (result of the
|
|
||||||
// memory segments merge). Since this total would still be
|
|
||||||
// <= io.sort.factor, we will not do any more intermediate merges,
|
|
||||||
// the merge of all these disk segments would be directly fed to the
|
|
||||||
// reduce method
|
|
||||||
|
|
||||||
mergePhaseFinished = true;
|
|
||||||
// must spill to disk, but can't retain in-mem for intermediate merge
|
|
||||||
final Path outputPath =
|
|
||||||
mapOutputFile.getInputFileForWrite(mapId,
|
|
||||||
inMemToDiskBytes).suffix(
|
|
||||||
Task.MERGED_OUTPUT_PREFIX);
|
|
||||||
final RawKeyValueIterator rIter = Merger.merge(job, fs,
|
|
||||||
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
|
|
||||||
tmpDir, comparator, reporter, spilledRecordsCounter, null,
|
|
||||||
mergePhase);
|
|
||||||
final Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
|
|
||||||
keyClass, valueClass, codec, null);
|
|
||||||
try {
|
|
||||||
Merger.writeFile(rIter, writer, reporter, job);
|
|
||||||
// add to list of final disk outputs.
|
|
||||||
onDiskMapOutputs.add(outputPath);
|
|
||||||
} catch (IOException e) {
|
|
||||||
if (null != outputPath) {
|
|
||||||
try {
|
|
||||||
fs.delete(outputPath, true);
|
|
||||||
} catch (IOException ie) {
|
|
||||||
// NOTHING
|
|
||||||
}
|
|
||||||
}
|
|
||||||
throw e;
|
|
||||||
} finally {
|
|
||||||
if (null != writer) {
|
|
||||||
writer.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LOG.info("Merged " + numMemDiskSegments + " segments, " +
|
|
||||||
inMemToDiskBytes + " bytes to disk to satisfy " +
|
|
||||||
"reduce memory limit");
|
|
||||||
inMemToDiskBytes = 0;
|
|
||||||
memDiskSegments.clear();
|
|
||||||
} else if (inMemToDiskBytes != 0) {
|
|
||||||
LOG.info("Keeping " + numMemDiskSegments + " segments, " +
|
|
||||||
inMemToDiskBytes + " bytes in memory for " +
|
|
||||||
"intermediate, on-disk merge");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// segments on disk
|
|
||||||
List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
|
|
||||||
long onDiskBytes = inMemToDiskBytes;
|
|
||||||
Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]);
|
|
||||||
for (Path file : onDisk) {
|
|
||||||
onDiskBytes += fs.getFileStatus(file).getLen();
|
|
||||||
LOG.debug("Disk file: " + file + " Length is " +
|
|
||||||
fs.getFileStatus(file).getLen());
|
|
||||||
diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,
|
|
||||||
(file.toString().endsWith(
|
|
||||||
Task.MERGED_OUTPUT_PREFIX) ?
|
|
||||||
null : mergedMapOutputsCounter)
|
|
||||||
));
|
|
||||||
}
|
|
||||||
LOG.info("Merging " + onDisk.length + " files, " +
|
|
||||||
onDiskBytes + " bytes from disk");
|
|
||||||
Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
|
|
||||||
public int compare(Segment<K, V> o1, Segment<K, V> o2) {
|
|
||||||
if (o1.getLength() == o2.getLength()) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
return o1.getLength() < o2.getLength() ? -1 : 1;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// build final list of segments from merged backed by disk + in-mem
|
|
||||||
List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
|
|
||||||
long inMemBytes = createInMemorySegments(inMemoryMapOutputs,
|
|
||||||
finalSegments, 0);
|
|
||||||
LOG.info("Merging " + finalSegments.size() + " segments, " +
|
|
||||||
inMemBytes + " bytes from memory into reduce");
|
|
||||||
if (0 != onDiskBytes) {
|
|
||||||
final int numInMemSegments = memDiskSegments.size();
|
|
||||||
diskSegments.addAll(0, memDiskSegments);
|
|
||||||
memDiskSegments.clear();
|
|
||||||
// Pass mergePhase only if there is a going to be intermediate
|
|
||||||
// merges. See comment where mergePhaseFinished is being set
|
|
||||||
Progress thisPhase = (mergePhaseFinished) ? null : mergePhase;
|
|
||||||
RawKeyValueIterator diskMerge = Merger.merge(
|
|
||||||
job, fs, keyClass, valueClass, diskSegments,
|
|
||||||
ioSortFactor, numInMemSegments, tmpDir, comparator,
|
|
||||||
reporter, false, spilledRecordsCounter, null, thisPhase);
|
|
||||||
diskSegments.clear();
|
|
||||||
if (0 == finalSegments.size()) {
|
|
||||||
return diskMerge;
|
|
||||||
}
|
|
||||||
finalSegments.add(new Segment<K,V>(
|
|
||||||
new RawKVIteratorReader(diskMerge, onDiskBytes), true));
|
|
||||||
}
|
|
||||||
return Merger.merge(job, fs, keyClass, valueClass,
|
|
||||||
finalSegments, finalSegments.size(), tmpDir,
|
|
||||||
comparator, reporter, spilledRecordsCounter, null,
|
|
||||||
null);
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,797 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce.task.reduce;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.fs.ChecksumFileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
|
import org.apache.hadoop.io.RawComparator;
|
||||||
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
|
import org.apache.hadoop.mapred.Counters;
|
||||||
|
import org.apache.hadoop.mapred.IFile;
|
||||||
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
import org.apache.hadoop.mapred.MapOutputFile;
|
||||||
|
import org.apache.hadoop.mapred.Merger;
|
||||||
|
import org.apache.hadoop.mapred.RawKeyValueIterator;
|
||||||
|
import org.apache.hadoop.mapred.Reducer;
|
||||||
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
|
import org.apache.hadoop.mapred.Task;
|
||||||
|
import org.apache.hadoop.mapred.IFile.Reader;
|
||||||
|
import org.apache.hadoop.mapred.IFile.Writer;
|
||||||
|
import org.apache.hadoop.mapred.Merger.Segment;
|
||||||
|
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
|
||||||
|
import org.apache.hadoop.mapred.Task.CombineValuesIterator;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskID;
|
||||||
|
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator;
|
||||||
|
import org.apache.hadoop.util.Progress;
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
@SuppressWarnings(value={"unchecked"})
|
||||||
|
@InterfaceAudience.LimitedPrivate({"MapReduce"})
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(MergeManagerImpl.class);
|
||||||
|
|
||||||
|
/* Maximum percentage of the in-memory limit that a single shuffle can
|
||||||
|
* consume*/
|
||||||
|
private static final float DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT
|
||||||
|
= 0.25f;
|
||||||
|
|
||||||
|
private final TaskAttemptID reduceId;
|
||||||
|
|
||||||
|
private final JobConf jobConf;
|
||||||
|
private final FileSystem localFS;
|
||||||
|
private final FileSystem rfs;
|
||||||
|
private final LocalDirAllocator localDirAllocator;
|
||||||
|
|
||||||
|
protected MapOutputFile mapOutputFile;
|
||||||
|
|
||||||
|
Set<InMemoryMapOutput<K, V>> inMemoryMergedMapOutputs =
|
||||||
|
new TreeSet<InMemoryMapOutput<K,V>>(new MapOutputComparator<K, V>());
|
||||||
|
private IntermediateMemoryToMemoryMerger memToMemMerger;
|
||||||
|
|
||||||
|
Set<InMemoryMapOutput<K, V>> inMemoryMapOutputs =
|
||||||
|
new TreeSet<InMemoryMapOutput<K,V>>(new MapOutputComparator<K, V>());
|
||||||
|
private final MergeThread<InMemoryMapOutput<K,V>, K,V> inMemoryMerger;
|
||||||
|
|
||||||
|
Set<Path> onDiskMapOutputs = new TreeSet<Path>();
|
||||||
|
private final OnDiskMerger onDiskMerger;
|
||||||
|
|
||||||
|
private final long memoryLimit;
|
||||||
|
private long usedMemory;
|
||||||
|
private long commitMemory;
|
||||||
|
private final long maxSingleShuffleLimit;
|
||||||
|
|
||||||
|
private final int memToMemMergeOutputsThreshold;
|
||||||
|
private final long mergeThreshold;
|
||||||
|
|
||||||
|
private final int ioSortFactor;
|
||||||
|
|
||||||
|
private final Reporter reporter;
|
||||||
|
private final ExceptionReporter exceptionReporter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Combiner class to run during in-memory merge, if defined.
|
||||||
|
*/
|
||||||
|
private final Class<? extends Reducer> combinerClass;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resettable collector used for combine.
|
||||||
|
*/
|
||||||
|
private final CombineOutputCollector<K,V> combineCollector;
|
||||||
|
|
||||||
|
private final Counters.Counter spilledRecordsCounter;
|
||||||
|
|
||||||
|
private final Counters.Counter reduceCombineInputCounter;
|
||||||
|
|
||||||
|
private final Counters.Counter mergedMapOutputsCounter;
|
||||||
|
|
||||||
|
private final CompressionCodec codec;
|
||||||
|
|
||||||
|
private final Progress mergePhase;
|
||||||
|
|
||||||
|
public MergeManagerImpl(TaskAttemptID reduceId, JobConf jobConf,
|
||||||
|
FileSystem localFS,
|
||||||
|
LocalDirAllocator localDirAllocator,
|
||||||
|
Reporter reporter,
|
||||||
|
CompressionCodec codec,
|
||||||
|
Class<? extends Reducer> combinerClass,
|
||||||
|
CombineOutputCollector<K,V> combineCollector,
|
||||||
|
Counters.Counter spilledRecordsCounter,
|
||||||
|
Counters.Counter reduceCombineInputCounter,
|
||||||
|
Counters.Counter mergedMapOutputsCounter,
|
||||||
|
ExceptionReporter exceptionReporter,
|
||||||
|
Progress mergePhase, MapOutputFile mapOutputFile) {
|
||||||
|
this.reduceId = reduceId;
|
||||||
|
this.jobConf = jobConf;
|
||||||
|
this.localDirAllocator = localDirAllocator;
|
||||||
|
this.exceptionReporter = exceptionReporter;
|
||||||
|
|
||||||
|
this.reporter = reporter;
|
||||||
|
this.codec = codec;
|
||||||
|
this.combinerClass = combinerClass;
|
||||||
|
this.combineCollector = combineCollector;
|
||||||
|
this.reduceCombineInputCounter = reduceCombineInputCounter;
|
||||||
|
this.spilledRecordsCounter = spilledRecordsCounter;
|
||||||
|
this.mergedMapOutputsCounter = mergedMapOutputsCounter;
|
||||||
|
this.mapOutputFile = mapOutputFile;
|
||||||
|
this.mapOutputFile.setConf(jobConf);
|
||||||
|
|
||||||
|
this.localFS = localFS;
|
||||||
|
this.rfs = ((LocalFileSystem)localFS).getRaw();
|
||||||
|
|
||||||
|
final float maxInMemCopyUse =
|
||||||
|
jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 0.90f);
|
||||||
|
if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
|
||||||
|
throw new IllegalArgumentException("Invalid value for " +
|
||||||
|
MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": " +
|
||||||
|
maxInMemCopyUse);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allow unit tests to fix Runtime memory
|
||||||
|
this.memoryLimit =
|
||||||
|
(long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
|
||||||
|
Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
|
||||||
|
* maxInMemCopyUse);
|
||||||
|
|
||||||
|
this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
|
||||||
|
|
||||||
|
final float singleShuffleMemoryLimitPercent =
|
||||||
|
jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT,
|
||||||
|
DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT);
|
||||||
|
if (singleShuffleMemoryLimitPercent <= 0.0f
|
||||||
|
|| singleShuffleMemoryLimitPercent > 1.0f) {
|
||||||
|
throw new IllegalArgumentException("Invalid value for "
|
||||||
|
+ MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
|
||||||
|
+ singleShuffleMemoryLimitPercent);
|
||||||
|
}
|
||||||
|
|
||||||
|
usedMemory = 0L;
|
||||||
|
commitMemory = 0L;
|
||||||
|
this.maxSingleShuffleLimit =
|
||||||
|
(long)(memoryLimit * singleShuffleMemoryLimitPercent);
|
||||||
|
this.memToMemMergeOutputsThreshold =
|
||||||
|
jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
|
||||||
|
this.mergeThreshold = (long)(this.memoryLimit *
|
||||||
|
jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT,
|
||||||
|
0.90f));
|
||||||
|
LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
|
||||||
|
"maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
|
||||||
|
"mergeThreshold=" + mergeThreshold + ", " +
|
||||||
|
"ioSortFactor=" + ioSortFactor + ", " +
|
||||||
|
"memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
|
||||||
|
|
||||||
|
if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
|
||||||
|
throw new RuntimeException("Invlaid configuration: "
|
||||||
|
+ "maxSingleShuffleLimit should be less than mergeThreshold"
|
||||||
|
+ "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
|
||||||
|
+ "mergeThreshold: " + this.mergeThreshold);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean allowMemToMemMerge =
|
||||||
|
jobConf.getBoolean(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, false);
|
||||||
|
if (allowMemToMemMerge) {
|
||||||
|
this.memToMemMerger =
|
||||||
|
new IntermediateMemoryToMemoryMerger(this,
|
||||||
|
memToMemMergeOutputsThreshold);
|
||||||
|
this.memToMemMerger.start();
|
||||||
|
} else {
|
||||||
|
this.memToMemMerger = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.inMemoryMerger = createInMemoryMerger();
|
||||||
|
this.inMemoryMerger.start();
|
||||||
|
|
||||||
|
this.onDiskMerger = new OnDiskMerger(this);
|
||||||
|
this.onDiskMerger.start();
|
||||||
|
|
||||||
|
this.mergePhase = mergePhase;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MergeThread<InMemoryMapOutput<K,V>, K,V> createInMemoryMerger() {
|
||||||
|
return new InMemoryMerger(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
TaskAttemptID getReduceId() {
|
||||||
|
return reduceId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
ExceptionReporter getExceptionReporter() {
|
||||||
|
return exceptionReporter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void waitForResource() throws InterruptedException {
|
||||||
|
inMemoryMerger.waitForMerge();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean canShuffleToMemory(long requestedSize) {
|
||||||
|
return (requestedSize < maxSingleShuffleLimit);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId,
|
||||||
|
long requestedSize,
|
||||||
|
int fetcher
|
||||||
|
) throws IOException {
|
||||||
|
if (!canShuffleToMemory(requestedSize)) {
|
||||||
|
LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
|
||||||
|
" is greater than maxSingleShuffleLimit (" +
|
||||||
|
maxSingleShuffleLimit + ")");
|
||||||
|
return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize,
|
||||||
|
jobConf, mapOutputFile, fetcher, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stall shuffle if we are above the memory limit
|
||||||
|
|
||||||
|
// It is possible that all threads could just be stalling and not make
|
||||||
|
// progress at all. This could happen when:
|
||||||
|
//
|
||||||
|
// requested size is causing the used memory to go above limit &&
|
||||||
|
// requested size < singleShuffleLimit &&
|
||||||
|
// current used size < mergeThreshold (merge will not get triggered)
|
||||||
|
//
|
||||||
|
// To avoid this from happening, we allow exactly one thread to go past
|
||||||
|
// the memory limit. We check (usedMemory > memoryLimit) and not
|
||||||
|
// (usedMemory + requestedSize > memoryLimit). When this thread is done
|
||||||
|
// fetching, this will automatically trigger a merge thereby unlocking
|
||||||
|
// all the stalled threads
|
||||||
|
|
||||||
|
if (usedMemory > memoryLimit) {
|
||||||
|
LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
|
||||||
|
+ ") is greater than memoryLimit (" + memoryLimit + ")." +
|
||||||
|
" CommitMemory is (" + commitMemory + ")");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allow the in-memory shuffle to progress
|
||||||
|
LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
|
||||||
|
+ usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
|
||||||
|
+ "CommitMemory is (" + commitMemory + ")");
|
||||||
|
return unconditionalReserve(mapId, requestedSize, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unconditional Reserve is used by the Memory-to-Memory thread
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private synchronized InMemoryMapOutput<K, V> unconditionalReserve(
|
||||||
|
TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
|
||||||
|
usedMemory += requestedSize;
|
||||||
|
return new InMemoryMapOutput<K,V>(jobConf, mapId, this, (int)requestedSize,
|
||||||
|
codec, primaryMapOutput);
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void unreserve(long size) {
|
||||||
|
usedMemory -= size;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void closeInMemoryFile(InMemoryMapOutput<K,V> mapOutput) {
|
||||||
|
inMemoryMapOutputs.add(mapOutput);
|
||||||
|
LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
|
||||||
|
+ ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
|
||||||
|
+ ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);
|
||||||
|
|
||||||
|
commitMemory+= mapOutput.getSize();
|
||||||
|
|
||||||
|
// Can hang if mergeThreshold is really low.
|
||||||
|
if (commitMemory >= mergeThreshold) {
|
||||||
|
LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
|
||||||
|
commitMemory + " > mergeThreshold=" + mergeThreshold +
|
||||||
|
". Current usedMemory=" + usedMemory);
|
||||||
|
inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
|
||||||
|
inMemoryMergedMapOutputs.clear();
|
||||||
|
inMemoryMerger.startMerge(inMemoryMapOutputs);
|
||||||
|
commitMemory = 0L; // Reset commitMemory.
|
||||||
|
}
|
||||||
|
|
||||||
|
if (memToMemMerger != null) {
|
||||||
|
if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
|
||||||
|
memToMemMerger.startMerge(inMemoryMapOutputs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public synchronized void closeInMemoryMergedFile(InMemoryMapOutput<K,V> mapOutput) {
|
||||||
|
inMemoryMergedMapOutputs.add(mapOutput);
|
||||||
|
LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() +
|
||||||
|
", inMemoryMergedMapOutputs.size() -> " +
|
||||||
|
inMemoryMergedMapOutputs.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void closeOnDiskFile(Path file) {
|
||||||
|
onDiskMapOutputs.add(file);
|
||||||
|
|
||||||
|
if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
|
||||||
|
onDiskMerger.startMerge(onDiskMapOutputs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RawKeyValueIterator close() throws Throwable {
|
||||||
|
// Wait for on-going merges to complete
|
||||||
|
if (memToMemMerger != null) {
|
||||||
|
memToMemMerger.close();
|
||||||
|
}
|
||||||
|
inMemoryMerger.close();
|
||||||
|
onDiskMerger.close();
|
||||||
|
|
||||||
|
List<InMemoryMapOutput<K, V>> memory =
|
||||||
|
new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
|
||||||
|
memory.addAll(inMemoryMapOutputs);
|
||||||
|
List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
|
||||||
|
return finalMerge(jobConf, rfs, memory, disk);
|
||||||
|
}
|
||||||
|
|
||||||
|
private class IntermediateMemoryToMemoryMerger
|
||||||
|
extends MergeThread<InMemoryMapOutput<K, V>, K, V> {
|
||||||
|
|
||||||
|
public IntermediateMemoryToMemoryMerger(MergeManagerImpl<K, V> manager,
|
||||||
|
int mergeFactor) {
|
||||||
|
super(manager, mergeFactor, exceptionReporter);
|
||||||
|
setName("InMemoryMerger - Thread to do in-memory merge of in-memory " +
|
||||||
|
"shuffled map-outputs");
|
||||||
|
setDaemon(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
|
||||||
|
if (inputs == null || inputs.size() == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
TaskAttemptID dummyMapId = inputs.get(0).getMapId();
|
||||||
|
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
|
||||||
|
long mergeOutputSize =
|
||||||
|
createInMemorySegments(inputs, inMemorySegments, 0);
|
||||||
|
int noInMemorySegments = inMemorySegments.size();
|
||||||
|
|
||||||
|
InMemoryMapOutput<K, V> mergedMapOutputs =
|
||||||
|
unconditionalReserve(dummyMapId, mergeOutputSize, false);
|
||||||
|
|
||||||
|
Writer<K, V> writer =
|
||||||
|
new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
|
||||||
|
|
||||||
|
LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
|
||||||
|
" segments of total-size: " + mergeOutputSize);
|
||||||
|
|
||||||
|
RawKeyValueIterator rIter =
|
||||||
|
Merger.merge(jobConf, rfs,
|
||||||
|
(Class<K>)jobConf.getMapOutputKeyClass(),
|
||||||
|
(Class<V>)jobConf.getMapOutputValueClass(),
|
||||||
|
inMemorySegments, inMemorySegments.size(),
|
||||||
|
new Path(reduceId.toString()),
|
||||||
|
(RawComparator<K>)jobConf.getOutputKeyComparator(),
|
||||||
|
reporter, null, null, null);
|
||||||
|
Merger.writeFile(rIter, writer, reporter, jobConf);
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
LOG.info(reduceId +
|
||||||
|
" Memory-to-Memory merge of the " + noInMemorySegments +
|
||||||
|
" files in-memory complete.");
|
||||||
|
|
||||||
|
// Note the output of the merge
|
||||||
|
closeInMemoryMergedFile(mergedMapOutputs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class InMemoryMerger extends MergeThread<InMemoryMapOutput<K,V>, K,V> {
|
||||||
|
|
||||||
|
public InMemoryMerger(MergeManagerImpl<K, V> manager) {
|
||||||
|
super(manager, Integer.MAX_VALUE, exceptionReporter);
|
||||||
|
setName
|
||||||
|
("InMemoryMerger - Thread to merge in-memory shuffled map-outputs");
|
||||||
|
setDaemon(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
|
||||||
|
if (inputs == null || inputs.size() == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
//name this output file same as the name of the first file that is
|
||||||
|
//there in the current list of inmem files (this is guaranteed to
|
||||||
|
//be absent on the disk currently. So we don't overwrite a prev.
|
||||||
|
//created spill). Also we need to create the output file now since
|
||||||
|
//it is not guaranteed that this file will be present after merge
|
||||||
|
//is called (we delete empty files as soon as we see them
|
||||||
|
//in the merge method)
|
||||||
|
|
||||||
|
//figure out the mapId
|
||||||
|
TaskAttemptID mapId = inputs.get(0).getMapId();
|
||||||
|
TaskID mapTaskId = mapId.getTaskID();
|
||||||
|
|
||||||
|
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
|
||||||
|
long mergeOutputSize =
|
||||||
|
createInMemorySegments(inputs, inMemorySegments,0);
|
||||||
|
int noInMemorySegments = inMemorySegments.size();
|
||||||
|
|
||||||
|
Path outputPath =
|
||||||
|
mapOutputFile.getInputFileForWrite(mapTaskId,
|
||||||
|
mergeOutputSize).suffix(
|
||||||
|
Task.MERGED_OUTPUT_PREFIX);
|
||||||
|
|
||||||
|
Writer<K,V> writer =
|
||||||
|
new Writer<K,V>(jobConf, rfs, outputPath,
|
||||||
|
(Class<K>) jobConf.getMapOutputKeyClass(),
|
||||||
|
(Class<V>) jobConf.getMapOutputValueClass(),
|
||||||
|
codec, null);
|
||||||
|
|
||||||
|
RawKeyValueIterator rIter = null;
|
||||||
|
try {
|
||||||
|
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
|
||||||
|
" segments...");
|
||||||
|
|
||||||
|
rIter = Merger.merge(jobConf, rfs,
|
||||||
|
(Class<K>)jobConf.getMapOutputKeyClass(),
|
||||||
|
(Class<V>)jobConf.getMapOutputValueClass(),
|
||||||
|
inMemorySegments, inMemorySegments.size(),
|
||||||
|
new Path(reduceId.toString()),
|
||||||
|
(RawComparator<K>)jobConf.getOutputKeyComparator(),
|
||||||
|
reporter, spilledRecordsCounter, null, null);
|
||||||
|
|
||||||
|
if (null == combinerClass) {
|
||||||
|
Merger.writeFile(rIter, writer, reporter, jobConf);
|
||||||
|
} else {
|
||||||
|
combineCollector.setWriter(writer);
|
||||||
|
combineAndSpill(rIter, reduceCombineInputCounter);
|
||||||
|
}
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
LOG.info(reduceId +
|
||||||
|
" Merge of the " + noInMemorySegments +
|
||||||
|
" files in-memory complete." +
|
||||||
|
" Local file is " + outputPath + " of size " +
|
||||||
|
localFS.getFileStatus(outputPath).getLen());
|
||||||
|
} catch (IOException e) {
|
||||||
|
//make sure that we delete the ondisk file that we created
|
||||||
|
//earlier when we invoked cloneFileAttributes
|
||||||
|
localFS.delete(outputPath, true);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note the output of the merge
|
||||||
|
closeOnDiskFile(outputPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private class OnDiskMerger extends MergeThread<Path,K,V> {
|
||||||
|
|
||||||
|
public OnDiskMerger(MergeManagerImpl<K, V> manager) {
|
||||||
|
super(manager, Integer.MAX_VALUE, exceptionReporter);
|
||||||
|
setName("OnDiskMerger - Thread to merge on-disk map-outputs");
|
||||||
|
setDaemon(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void merge(List<Path> inputs) throws IOException {
|
||||||
|
// sanity check
|
||||||
|
if (inputs == null || inputs.isEmpty()) {
|
||||||
|
LOG.info("No ondisk files to merge...");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
long approxOutputSize = 0;
|
||||||
|
int bytesPerSum =
|
||||||
|
jobConf.getInt("io.bytes.per.checksum", 512);
|
||||||
|
|
||||||
|
LOG.info("OnDiskMerger: We have " + inputs.size() +
|
||||||
|
" map outputs on disk. Triggering merge...");
|
||||||
|
|
||||||
|
// 1. Prepare the list of files to be merged.
|
||||||
|
for (Path file : inputs) {
|
||||||
|
approxOutputSize += localFS.getFileStatus(file).getLen();
|
||||||
|
}
|
||||||
|
|
||||||
|
// add the checksum length
|
||||||
|
approxOutputSize +=
|
||||||
|
ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
|
||||||
|
|
||||||
|
// 2. Start the on-disk merge process
|
||||||
|
Path outputPath =
|
||||||
|
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
|
||||||
|
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
|
||||||
|
Writer<K,V> writer =
|
||||||
|
new Writer<K,V>(jobConf, rfs, outputPath,
|
||||||
|
(Class<K>) jobConf.getMapOutputKeyClass(),
|
||||||
|
(Class<V>) jobConf.getMapOutputValueClass(),
|
||||||
|
codec, null);
|
||||||
|
RawKeyValueIterator iter = null;
|
||||||
|
Path tmpDir = new Path(reduceId.toString());
|
||||||
|
try {
|
||||||
|
iter = Merger.merge(jobConf, rfs,
|
||||||
|
(Class<K>) jobConf.getMapOutputKeyClass(),
|
||||||
|
(Class<V>) jobConf.getMapOutputValueClass(),
|
||||||
|
codec, inputs.toArray(new Path[inputs.size()]),
|
||||||
|
true, ioSortFactor, tmpDir,
|
||||||
|
(RawComparator<K>) jobConf.getOutputKeyComparator(),
|
||||||
|
reporter, spilledRecordsCounter, null,
|
||||||
|
mergedMapOutputsCounter, null);
|
||||||
|
|
||||||
|
Merger.writeFile(iter, writer, reporter, jobConf);
|
||||||
|
writer.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
localFS.delete(outputPath, true);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
closeOnDiskFile(outputPath);
|
||||||
|
|
||||||
|
LOG.info(reduceId +
|
||||||
|
" Finished merging " + inputs.size() +
|
||||||
|
" map output files on disk of total-size " +
|
||||||
|
approxOutputSize + "." +
|
||||||
|
" Local output file is " + outputPath + " of size " +
|
||||||
|
localFS.getFileStatus(outputPath).getLen());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void combineAndSpill(
|
||||||
|
RawKeyValueIterator kvIter,
|
||||||
|
Counters.Counter inCounter) throws IOException {
|
||||||
|
JobConf job = jobConf;
|
||||||
|
Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
|
||||||
|
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
|
||||||
|
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
|
||||||
|
RawComparator<K> comparator =
|
||||||
|
(RawComparator<K>)job.getOutputKeyComparator();
|
||||||
|
try {
|
||||||
|
CombineValuesIterator values = new CombineValuesIterator(
|
||||||
|
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
|
||||||
|
inCounter);
|
||||||
|
while (values.more()) {
|
||||||
|
combiner.reduce(values.getKey(), values, combineCollector,
|
||||||
|
Reporter.NULL);
|
||||||
|
values.nextKey();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
combiner.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private long createInMemorySegments(List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
|
||||||
|
List<Segment<K, V>> inMemorySegments,
|
||||||
|
long leaveBytes
|
||||||
|
) throws IOException {
|
||||||
|
long totalSize = 0L;
|
||||||
|
// We could use fullSize could come from the RamManager, but files can be
|
||||||
|
// closed but not yet present in inMemoryMapOutputs
|
||||||
|
long fullSize = 0L;
|
||||||
|
for (InMemoryMapOutput<K,V> mo : inMemoryMapOutputs) {
|
||||||
|
fullSize += mo.getMemory().length;
|
||||||
|
}
|
||||||
|
while(fullSize > leaveBytes) {
|
||||||
|
InMemoryMapOutput<K,V> mo = inMemoryMapOutputs.remove(0);
|
||||||
|
byte[] data = mo.getMemory();
|
||||||
|
long size = data.length;
|
||||||
|
totalSize += size;
|
||||||
|
fullSize -= size;
|
||||||
|
Reader<K,V> reader = new InMemoryReader<K,V>(MergeManagerImpl.this,
|
||||||
|
mo.getMapId(),
|
||||||
|
data, 0, (int)size);
|
||||||
|
inMemorySegments.add(new Segment<K,V>(reader, true,
|
||||||
|
(mo.isPrimaryMapOutput() ?
|
||||||
|
mergedMapOutputsCounter : null)));
|
||||||
|
}
|
||||||
|
return totalSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
class RawKVIteratorReader extends IFile.Reader<K,V> {
|
||||||
|
|
||||||
|
private final RawKeyValueIterator kvIter;
|
||||||
|
|
||||||
|
public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
|
||||||
|
throws IOException {
|
||||||
|
super(null, null, size, null, spilledRecordsCounter);
|
||||||
|
this.kvIter = kvIter;
|
||||||
|
}
|
||||||
|
public boolean nextRawKey(DataInputBuffer key) throws IOException {
|
||||||
|
if (kvIter.next()) {
|
||||||
|
final DataInputBuffer kb = kvIter.getKey();
|
||||||
|
final int kp = kb.getPosition();
|
||||||
|
final int klen = kb.getLength() - kp;
|
||||||
|
key.reset(kb.getData(), kp, klen);
|
||||||
|
bytesRead += klen;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
public void nextRawValue(DataInputBuffer value) throws IOException {
|
||||||
|
final DataInputBuffer vb = kvIter.getValue();
|
||||||
|
final int vp = vb.getPosition();
|
||||||
|
final int vlen = vb.getLength() - vp;
|
||||||
|
value.reset(vb.getData(), vp, vlen);
|
||||||
|
bytesRead += vlen;
|
||||||
|
}
|
||||||
|
public long getPosition() throws IOException {
|
||||||
|
return bytesRead;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() throws IOException {
|
||||||
|
kvIter.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
|
||||||
|
List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
|
||||||
|
List<Path> onDiskMapOutputs
|
||||||
|
) throws IOException {
|
||||||
|
LOG.info("finalMerge called with " +
|
||||||
|
inMemoryMapOutputs.size() + " in-memory map-outputs and " +
|
||||||
|
onDiskMapOutputs.size() + " on-disk map-outputs");
|
||||||
|
|
||||||
|
final float maxRedPer =
|
||||||
|
job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
|
||||||
|
if (maxRedPer > 1.0 || maxRedPer < 0.0) {
|
||||||
|
throw new IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT +
|
||||||
|
maxRedPer);
|
||||||
|
}
|
||||||
|
int maxInMemReduce = (int)Math.min(
|
||||||
|
Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
|
||||||
|
|
||||||
|
|
||||||
|
// merge config params
|
||||||
|
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
|
||||||
|
Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
|
||||||
|
boolean keepInputs = job.getKeepFailedTaskFiles();
|
||||||
|
final Path tmpDir = new Path(reduceId.toString());
|
||||||
|
final RawComparator<K> comparator =
|
||||||
|
(RawComparator<K>)job.getOutputKeyComparator();
|
||||||
|
|
||||||
|
// segments required to vacate memory
|
||||||
|
List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
|
||||||
|
long inMemToDiskBytes = 0;
|
||||||
|
boolean mergePhaseFinished = false;
|
||||||
|
if (inMemoryMapOutputs.size() > 0) {
|
||||||
|
TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
|
||||||
|
inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs,
|
||||||
|
memDiskSegments,
|
||||||
|
maxInMemReduce);
|
||||||
|
final int numMemDiskSegments = memDiskSegments.size();
|
||||||
|
if (numMemDiskSegments > 0 &&
|
||||||
|
ioSortFactor > onDiskMapOutputs.size()) {
|
||||||
|
|
||||||
|
// If we reach here, it implies that we have less than io.sort.factor
|
||||||
|
// disk segments and this will be incremented by 1 (result of the
|
||||||
|
// memory segments merge). Since this total would still be
|
||||||
|
// <= io.sort.factor, we will not do any more intermediate merges,
|
||||||
|
// the merge of all these disk segments would be directly fed to the
|
||||||
|
// reduce method
|
||||||
|
|
||||||
|
mergePhaseFinished = true;
|
||||||
|
// must spill to disk, but can't retain in-mem for intermediate merge
|
||||||
|
final Path outputPath =
|
||||||
|
mapOutputFile.getInputFileForWrite(mapId,
|
||||||
|
inMemToDiskBytes).suffix(
|
||||||
|
Task.MERGED_OUTPUT_PREFIX);
|
||||||
|
final RawKeyValueIterator rIter = Merger.merge(job, fs,
|
||||||
|
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
|
||||||
|
tmpDir, comparator, reporter, spilledRecordsCounter, null,
|
||||||
|
mergePhase);
|
||||||
|
final Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
|
||||||
|
keyClass, valueClass, codec, null);
|
||||||
|
try {
|
||||||
|
Merger.writeFile(rIter, writer, reporter, job);
|
||||||
|
// add to list of final disk outputs.
|
||||||
|
onDiskMapOutputs.add(outputPath);
|
||||||
|
} catch (IOException e) {
|
||||||
|
if (null != outputPath) {
|
||||||
|
try {
|
||||||
|
fs.delete(outputPath, true);
|
||||||
|
} catch (IOException ie) {
|
||||||
|
// NOTHING
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
if (null != writer) {
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("Merged " + numMemDiskSegments + " segments, " +
|
||||||
|
inMemToDiskBytes + " bytes to disk to satisfy " +
|
||||||
|
"reduce memory limit");
|
||||||
|
inMemToDiskBytes = 0;
|
||||||
|
memDiskSegments.clear();
|
||||||
|
} else if (inMemToDiskBytes != 0) {
|
||||||
|
LOG.info("Keeping " + numMemDiskSegments + " segments, " +
|
||||||
|
inMemToDiskBytes + " bytes in memory for " +
|
||||||
|
"intermediate, on-disk merge");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// segments on disk
|
||||||
|
List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
|
||||||
|
long onDiskBytes = inMemToDiskBytes;
|
||||||
|
Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]);
|
||||||
|
for (Path file : onDisk) {
|
||||||
|
onDiskBytes += fs.getFileStatus(file).getLen();
|
||||||
|
LOG.debug("Disk file: " + file + " Length is " +
|
||||||
|
fs.getFileStatus(file).getLen());
|
||||||
|
diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,
|
||||||
|
(file.toString().endsWith(
|
||||||
|
Task.MERGED_OUTPUT_PREFIX) ?
|
||||||
|
null : mergedMapOutputsCounter)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
LOG.info("Merging " + onDisk.length + " files, " +
|
||||||
|
onDiskBytes + " bytes from disk");
|
||||||
|
Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
|
||||||
|
public int compare(Segment<K, V> o1, Segment<K, V> o2) {
|
||||||
|
if (o1.getLength() == o2.getLength()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return o1.getLength() < o2.getLength() ? -1 : 1;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// build final list of segments from merged backed by disk + in-mem
|
||||||
|
List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
|
||||||
|
long inMemBytes = createInMemorySegments(inMemoryMapOutputs,
|
||||||
|
finalSegments, 0);
|
||||||
|
LOG.info("Merging " + finalSegments.size() + " segments, " +
|
||||||
|
inMemBytes + " bytes from memory into reduce");
|
||||||
|
if (0 != onDiskBytes) {
|
||||||
|
final int numInMemSegments = memDiskSegments.size();
|
||||||
|
diskSegments.addAll(0, memDiskSegments);
|
||||||
|
memDiskSegments.clear();
|
||||||
|
// Pass mergePhase only if there is a going to be intermediate
|
||||||
|
// merges. See comment where mergePhaseFinished is being set
|
||||||
|
Progress thisPhase = (mergePhaseFinished) ? null : mergePhase;
|
||||||
|
RawKeyValueIterator diskMerge = Merger.merge(
|
||||||
|
job, fs, keyClass, valueClass, diskSegments,
|
||||||
|
ioSortFactor, numInMemSegments, tmpDir, comparator,
|
||||||
|
reporter, false, spilledRecordsCounter, null, thisPhase);
|
||||||
|
diskSegments.clear();
|
||||||
|
if (0 == finalSegments.size()) {
|
||||||
|
return diskMerge;
|
||||||
|
}
|
||||||
|
finalSegments.add(new Segment<K,V>(
|
||||||
|
new RawKVIteratorReader(diskMerge, onDiskBytes), true));
|
||||||
|
}
|
||||||
|
return Merger.merge(job, fs, keyClass, valueClass,
|
||||||
|
finalSegments, finalSegments.size(), tmpDir,
|
||||||
|
comparator, reporter, spilledRecordsCounter, null,
|
||||||
|
null);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -34,12 +34,12 @@ abstract class MergeThread<T,K,V> extends Thread {
|
||||||
|
|
||||||
private AtomicInteger numPending = new AtomicInteger(0);
|
private AtomicInteger numPending = new AtomicInteger(0);
|
||||||
private LinkedList<List<T>> pendingToBeMerged;
|
private LinkedList<List<T>> pendingToBeMerged;
|
||||||
protected final MergeManager<K,V> manager;
|
protected final MergeManagerImpl<K,V> manager;
|
||||||
private final ExceptionReporter reporter;
|
private final ExceptionReporter reporter;
|
||||||
private boolean closed = false;
|
private boolean closed = false;
|
||||||
private final int mergeFactor;
|
private final int mergeFactor;
|
||||||
|
|
||||||
public MergeThread(MergeManager<K,V> manager, int mergeFactor,
|
public MergeThread(MergeManagerImpl<K,V> manager, int mergeFactor,
|
||||||
ExceptionReporter reporter) {
|
ExceptionReporter reporter) {
|
||||||
this.pendingToBeMerged = new LinkedList<List<T>>();
|
this.pendingToBeMerged = new LinkedList<List<T>>();
|
||||||
this.manager = manager;
|
this.manager = manager;
|
||||||
|
|
|
@ -0,0 +1,131 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce.task.reduce;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
|
import org.apache.hadoop.mapred.MapOutputFile;
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
|
||||||
|
private static final Log LOG = LogFactory.getLog(OnDiskMapOutput.class);
|
||||||
|
private final FileSystem localFS;
|
||||||
|
private final Path tmpOutputPath;
|
||||||
|
private final Path outputPath;
|
||||||
|
private final MergeManagerImpl<K, V> merger;
|
||||||
|
private final OutputStream disk;
|
||||||
|
|
||||||
|
public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
|
||||||
|
MergeManagerImpl<K, V> merger, long size,
|
||||||
|
JobConf conf,
|
||||||
|
MapOutputFile mapOutputFile,
|
||||||
|
int fetcher, boolean primaryMapOutput)
|
||||||
|
throws IOException {
|
||||||
|
super(mapId, size, primaryMapOutput);
|
||||||
|
this.merger = merger;
|
||||||
|
this.localFS = FileSystem.getLocal(conf);
|
||||||
|
outputPath =
|
||||||
|
mapOutputFile.getInputFileForWrite(mapId.getTaskID(),size);
|
||||||
|
tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
|
||||||
|
|
||||||
|
disk = localFS.create(tmpOutputPath);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shuffle(MapHost host, InputStream input,
|
||||||
|
long compressedLength, long decompressedLength,
|
||||||
|
ShuffleClientMetrics metrics,
|
||||||
|
Reporter reporter) throws IOException {
|
||||||
|
// Copy data to local-disk
|
||||||
|
long bytesLeft = compressedLength;
|
||||||
|
try {
|
||||||
|
final int BYTES_TO_READ = 64 * 1024;
|
||||||
|
byte[] buf = new byte[BYTES_TO_READ];
|
||||||
|
while (bytesLeft > 0) {
|
||||||
|
int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
|
||||||
|
if (n < 0) {
|
||||||
|
throw new IOException("read past end of stream reading " +
|
||||||
|
getMapId());
|
||||||
|
}
|
||||||
|
disk.write(buf, 0, n);
|
||||||
|
bytesLeft -= n;
|
||||||
|
metrics.inputBytes(n);
|
||||||
|
reporter.progress();
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Read " + (compressedLength - bytesLeft) +
|
||||||
|
" bytes from map-output for " + getMapId());
|
||||||
|
|
||||||
|
disk.close();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// Close the streams
|
||||||
|
IOUtils.cleanup(LOG, input, disk);
|
||||||
|
|
||||||
|
// Re-throw
|
||||||
|
throw ioe;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sanity check
|
||||||
|
if (bytesLeft != 0) {
|
||||||
|
throw new IOException("Incomplete map output received for " +
|
||||||
|
getMapId() + " from " +
|
||||||
|
host.getHostName() + " (" +
|
||||||
|
bytesLeft + " bytes missing of " +
|
||||||
|
compressedLength + ")");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void commit() throws IOException {
|
||||||
|
localFS.rename(tmpOutputPath, outputPath);
|
||||||
|
merger.closeOnDiskFile(outputPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void abort() {
|
||||||
|
try {
|
||||||
|
localFS.delete(tmpOutputPath, false);
|
||||||
|
} catch (IOException ie) {
|
||||||
|
LOG.info("failure to clean up " + tmpOutputPath, ie);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDescription() {
|
||||||
|
return "DISK";
|
||||||
|
}
|
||||||
|
}
|
|
@ -21,17 +21,10 @@ import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
|
||||||
import org.apache.hadoop.mapred.Counters;
|
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.MapOutputFile;
|
|
||||||
import org.apache.hadoop.mapred.RawKeyValueIterator;
|
import org.apache.hadoop.mapred.RawKeyValueIterator;
|
||||||
import org.apache.hadoop.mapred.Reducer;
|
|
||||||
import org.apache.hadoop.mapred.Reporter;
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
import org.apache.hadoop.mapred.Task;
|
import org.apache.hadoop.mapred.Task;
|
||||||
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
|
|
||||||
import org.apache.hadoop.mapred.TaskStatus;
|
import org.apache.hadoop.mapred.TaskStatus;
|
||||||
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
|
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
|
||||||
import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
|
import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
|
||||||
|
@ -77,17 +70,21 @@ public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionRepo
|
||||||
this.taskStatus = context.getStatus();
|
this.taskStatus = context.getStatus();
|
||||||
this.reduceTask = context.getReduceTask();
|
this.reduceTask = context.getReduceTask();
|
||||||
|
|
||||||
scheduler =
|
scheduler = new ShuffleScheduler<K,V>(jobConf, taskStatus, this,
|
||||||
new ShuffleScheduler<K,V>(jobConf, taskStatus, this, copyPhase,
|
copyPhase, context.getShuffledMapsCounter(),
|
||||||
context.getShuffledMapsCounter(),
|
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
|
||||||
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
|
merger = createMergeManager(context);
|
||||||
merger = new MergeManager<K, V>(reduceId, jobConf, context.getLocalFS(),
|
}
|
||||||
context.getLocalDirAllocator(), reporter, context.getCodec(),
|
|
||||||
context.getCombinerClass(), context.getCombineCollector(),
|
protected MergeManager<K, V> createMergeManager(
|
||||||
context.getSpilledRecordsCounter(),
|
ShuffleConsumerPlugin.Context context) {
|
||||||
context.getReduceCombineInputCounter(),
|
return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(),
|
||||||
context.getMergedMapOutputsCounter(),
|
context.getLocalDirAllocator(), reporter, context.getCodec(),
|
||||||
this, context.getMergePhase(), context.getMapOutputFile());
|
context.getCombinerClass(), context.getCombineCollector(),
|
||||||
|
context.getSpilledRecordsCounter(),
|
||||||
|
context.getReduceCombineInputCounter(),
|
||||||
|
context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
|
||||||
|
context.getMapOutputFile());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class TestFetcher {
|
||||||
private HttpURLConnection connection;
|
private HttpURLConnection connection;
|
||||||
|
|
||||||
public FakeFetcher(JobConf job, TaskAttemptID reduceId,
|
public FakeFetcher(JobConf job, TaskAttemptID reduceId,
|
||||||
ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger, Reporter reporter,
|
ShuffleScheduler<K,V> scheduler, MergeManagerImpl<K,V> merger, Reporter reporter,
|
||||||
ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter,
|
ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter,
|
||||||
SecretKey jobTokenSecret, HttpURLConnection connection) {
|
SecretKey jobTokenSecret, HttpURLConnection connection) {
|
||||||
super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter,
|
super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter,
|
||||||
|
@ -77,7 +77,7 @@ public class TestFetcher {
|
||||||
JobConf job = new JobConf();
|
JobConf job = new JobConf();
|
||||||
TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
|
TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
|
||||||
ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
|
ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
|
||||||
MergeManager<Text, Text> mm = mock(MergeManager.class);
|
MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
|
||||||
Reporter r = mock(Reporter.class);
|
Reporter r = mock(Reporter.class);
|
||||||
ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
|
ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
|
||||||
ExceptionReporter except = mock(ExceptionReporter.class);
|
ExceptionReporter except = mock(ExceptionReporter.class);
|
||||||
|
@ -132,7 +132,7 @@ public class TestFetcher {
|
||||||
JobConf job = new JobConf();
|
JobConf job = new JobConf();
|
||||||
TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
|
TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
|
||||||
ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
|
ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
|
||||||
MergeManager<Text, Text> mm = mock(MergeManager.class);
|
MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
|
||||||
Reporter r = mock(Reporter.class);
|
Reporter r = mock(Reporter.class);
|
||||||
ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
|
ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
|
||||||
ExceptionReporter except = mock(ExceptionReporter.class);
|
ExceptionReporter except = mock(ExceptionReporter.class);
|
||||||
|
@ -167,10 +167,9 @@ public class TestFetcher {
|
||||||
header.write(new DataOutputStream(bout));
|
header.write(new DataOutputStream(bout));
|
||||||
ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
|
ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
|
||||||
when(connection.getInputStream()).thenReturn(in);
|
when(connection.getInputStream()).thenReturn(in);
|
||||||
//Defaults to WAIT, which is what we want to test
|
//Defaults to null, which is what we want to test
|
||||||
MapOutput<Text,Text> mapOut = new MapOutput<Text, Text>(map1ID);
|
|
||||||
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
|
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
|
||||||
.thenReturn(mapOut);
|
.thenReturn(null);
|
||||||
|
|
||||||
underTest.copyFromHost(host);
|
underTest.copyFromHost(host);
|
||||||
|
|
||||||
|
|
|
@ -32,13 +32,13 @@ import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.MapOutputFile;
|
import org.apache.hadoop.mapred.MapOutputFile;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestMergeManager {
|
public class TestMergeManager {
|
||||||
|
|
||||||
@Test(timeout=10000)
|
@Test(timeout=10000)
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public void testMemoryMerge() throws Exception {
|
public void testMemoryMerge() throws Exception {
|
||||||
final int TOTAL_MEM_BYTES = 10000;
|
final int TOTAL_MEM_BYTES = 10000;
|
||||||
final int OUTPUT_SIZE = 7950;
|
final int OUTPUT_SIZE = 7950;
|
||||||
|
@ -55,45 +55,47 @@ public class TestMergeManager {
|
||||||
|
|
||||||
// reserve enough map output to cause a merge when it is committed
|
// reserve enough map output to cause a merge when it is committed
|
||||||
MapOutput<Text, Text> out1 = mgr.reserve(null, OUTPUT_SIZE, 0);
|
MapOutput<Text, Text> out1 = mgr.reserve(null, OUTPUT_SIZE, 0);
|
||||||
Assert.assertEquals("Should be a memory merge",
|
Assert.assertTrue("Should be a memory merge",
|
||||||
Type.MEMORY, out1.getType());
|
(out1 instanceof InMemoryMapOutput));
|
||||||
fillOutput(out1);
|
InMemoryMapOutput<Text, Text> mout1 = (InMemoryMapOutput<Text, Text>)out1;
|
||||||
|
fillOutput(mout1);
|
||||||
MapOutput<Text, Text> out2 = mgr.reserve(null, OUTPUT_SIZE, 0);
|
MapOutput<Text, Text> out2 = mgr.reserve(null, OUTPUT_SIZE, 0);
|
||||||
Assert.assertEquals("Should be a memory merge",
|
Assert.assertTrue("Should be a memory merge",
|
||||||
Type.MEMORY, out2.getType());
|
(out2 instanceof InMemoryMapOutput));
|
||||||
fillOutput(out2);
|
InMemoryMapOutput<Text, Text> mout2 = (InMemoryMapOutput<Text, Text>)out2;
|
||||||
|
fillOutput(mout2);
|
||||||
|
|
||||||
// next reservation should be a WAIT
|
// next reservation should be a WAIT
|
||||||
MapOutput<Text, Text> out3 = mgr.reserve(null, OUTPUT_SIZE, 0);
|
MapOutput<Text, Text> out3 = mgr.reserve(null, OUTPUT_SIZE, 0);
|
||||||
Assert.assertEquals("Should be told to wait",
|
Assert.assertEquals("Should be told to wait", null, out3);
|
||||||
Type.WAIT, out3.getType());
|
|
||||||
|
|
||||||
// trigger the first merge and wait for merge thread to start merging
|
// trigger the first merge and wait for merge thread to start merging
|
||||||
// and free enough output to reserve more
|
// and free enough output to reserve more
|
||||||
out1.commit();
|
mout1.commit();
|
||||||
out2.commit();
|
mout2.commit();
|
||||||
mergeStart.await();
|
mergeStart.await();
|
||||||
|
|
||||||
Assert.assertEquals(1, mgr.getNumMerges());
|
Assert.assertEquals(1, mgr.getNumMerges());
|
||||||
|
|
||||||
// reserve enough map output to cause another merge when committed
|
// reserve enough map output to cause another merge when committed
|
||||||
out1 = mgr.reserve(null, OUTPUT_SIZE, 0);
|
out1 = mgr.reserve(null, OUTPUT_SIZE, 0);
|
||||||
Assert.assertEquals("Should be a memory merge",
|
Assert.assertTrue("Should be a memory merge",
|
||||||
Type.MEMORY, out1.getType());
|
(out1 instanceof InMemoryMapOutput));
|
||||||
fillOutput(out1);
|
mout1 = (InMemoryMapOutput<Text, Text>)out1;
|
||||||
|
fillOutput(mout1);
|
||||||
out2 = mgr.reserve(null, OUTPUT_SIZE, 0);
|
out2 = mgr.reserve(null, OUTPUT_SIZE, 0);
|
||||||
Assert.assertEquals("Should be a memory merge",
|
Assert.assertTrue("Should be a memory merge",
|
||||||
Type.MEMORY, out2.getType());
|
(out2 instanceof InMemoryMapOutput));
|
||||||
fillOutput(out2);
|
mout2 = (InMemoryMapOutput<Text, Text>)out2;
|
||||||
|
fillOutput(mout2);
|
||||||
|
|
||||||
// next reservation should be a WAIT
|
// next reservation should be null
|
||||||
out3 = mgr.reserve(null, OUTPUT_SIZE, 0);
|
out3 = mgr.reserve(null, OUTPUT_SIZE, 0);
|
||||||
Assert.assertEquals("Should be told to wait",
|
Assert.assertEquals("Should be told to wait", null, out3);
|
||||||
Type.WAIT, out3.getType());
|
|
||||||
|
|
||||||
// commit output *before* merge thread completes
|
// commit output *before* merge thread completes
|
||||||
out1.commit();
|
mout1.commit();
|
||||||
out2.commit();
|
mout2.commit();
|
||||||
|
|
||||||
// allow the first merge to complete
|
// allow the first merge to complete
|
||||||
mergeComplete.await();
|
mergeComplete.await();
|
||||||
|
@ -110,7 +112,7 @@ public class TestMergeManager {
|
||||||
0, reporter.getNumExceptions());
|
0, reporter.getNumExceptions());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fillOutput(MapOutput<Text, Text> output) throws IOException {
|
private void fillOutput(InMemoryMapOutput<Text, Text> output) throws IOException {
|
||||||
BoundedByteArrayOutputStream stream = output.getArrayStream();
|
BoundedByteArrayOutputStream stream = output.getArrayStream();
|
||||||
int count = stream.getLimit();
|
int count = stream.getLimit();
|
||||||
for (int i=0; i < count; ++i) {
|
for (int i=0; i < count; ++i) {
|
||||||
|
@ -118,7 +120,7 @@ public class TestMergeManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class StubbedMergeManager extends MergeManager<Text, Text> {
|
private static class StubbedMergeManager extends MergeManagerImpl<Text, Text> {
|
||||||
private TestMergeThread mergeThread;
|
private TestMergeThread mergeThread;
|
||||||
|
|
||||||
public StubbedMergeManager(JobConf conf, ExceptionReporter reporter,
|
public StubbedMergeManager(JobConf conf, ExceptionReporter reporter,
|
||||||
|
@ -129,7 +131,7 @@ public class TestMergeManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected MergeThread<MapOutput<Text, Text>, Text, Text> createInMemoryMerger() {
|
protected MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> createInMemoryMerger() {
|
||||||
mergeThread = new TestMergeThread(this, getExceptionReporter());
|
mergeThread = new TestMergeThread(this, getExceptionReporter());
|
||||||
return mergeThread;
|
return mergeThread;
|
||||||
}
|
}
|
||||||
|
@ -140,12 +142,12 @@ public class TestMergeManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class TestMergeThread
|
private static class TestMergeThread
|
||||||
extends MergeThread<MapOutput<Text,Text>, Text, Text> {
|
extends MergeThread<InMemoryMapOutput<Text,Text>, Text, Text> {
|
||||||
private AtomicInteger numMerges;
|
private AtomicInteger numMerges;
|
||||||
private CyclicBarrier mergeStart;
|
private CyclicBarrier mergeStart;
|
||||||
private CyclicBarrier mergeComplete;
|
private CyclicBarrier mergeComplete;
|
||||||
|
|
||||||
public TestMergeThread(MergeManager<Text, Text> mergeManager,
|
public TestMergeThread(MergeManagerImpl<Text, Text> mergeManager,
|
||||||
ExceptionReporter reporter) {
|
ExceptionReporter reporter) {
|
||||||
super(mergeManager, Integer.MAX_VALUE, reporter);
|
super(mergeManager, Integer.MAX_VALUE, reporter);
|
||||||
numMerges = new AtomicInteger(0);
|
numMerges = new AtomicInteger(0);
|
||||||
|
@ -162,11 +164,11 @@ public class TestMergeManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void merge(List<MapOutput<Text, Text>> inputs)
|
public void merge(List<InMemoryMapOutput<Text, Text>> inputs)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
numMerges.incrementAndGet();
|
numMerges.incrementAndGet();
|
||||||
for (MapOutput<Text, Text> input : inputs) {
|
for (InMemoryMapOutput<Text, Text> input : inputs) {
|
||||||
manager.unreserve(input.getSize());
|
manager.unreserve(input.getSize());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue