MAPREDUCE-434. LocalJobRunner limited to single reducer (Sandy Ryza and Aaron Kimball via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1510867 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2013-08-06 06:40:42 +00:00
parent afa778b8f9
commit 2a7293a254
14 changed files with 696 additions and 248 deletions

View File

@ -20,6 +20,9 @@ Release 2.3.0 - UNRELEASED
IMPROVEMENTS
MAPREDUCE-434. LocalJobRunner limited to single reducer (Sandy Ryza and
Aaron Kimball via Sandy Ryza)
OPTIMIZATIONS
MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus

View File

@ -79,11 +79,15 @@ public class LocalJobRunner implements ClientProtocol {
public static final String LOCAL_MAX_MAPS =
"mapreduce.local.map.tasks.maximum";
/** The maximum number of reduce tasks to run in parallel in LocalJobRunner */
public static final String LOCAL_MAX_REDUCES =
"mapreduce.local.reduce.tasks.maximum";
private FileSystem fs;
private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
private JobConf conf;
private AtomicInteger map_tasks = new AtomicInteger(0);
private int reduce_tasks = 0;
private AtomicInteger reduce_tasks = new AtomicInteger(0);
final Random rand = new Random();
private LocalJobRunnerMetrics myMetrics = null;
@ -115,9 +119,11 @@ public class LocalJobRunner implements ClientProtocol {
private JobConf job;
private int numMapTasks;
private int numReduceTasks;
private float [] partialMapProgress;
private float [] partialReduceProgress;
private Counters [] mapCounters;
private Counters reduceCounters;
private Counters [] reduceCounters;
private JobStatus status;
private List<TaskAttemptID> mapIds = Collections.synchronizedList(
@ -184,10 +190,14 @@ public class LocalJobRunner implements ClientProtocol {
this.start();
}
protected abstract class RunnableWithThrowable implements Runnable {
public volatile Throwable storedException;
}
/**
* A Runnable instance that handles a map task to be run by an executor.
*/
protected class MapTaskRunnable implements Runnable {
protected class MapTaskRunnable extends RunnableWithThrowable {
private final int taskId;
private final TaskSplitMetaInfo info;
private final JobID jobId;
@ -198,8 +208,6 @@ public class LocalJobRunner implements ClientProtocol {
// where to fetch mapper outputs.
private final Map<TaskAttemptID, MapOutputFile> mapOutputFiles;
public volatile Throwable storedException;
public MapTaskRunnable(TaskSplitMetaInfo info, int taskId, JobID jobId,
Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
this.info = info;
@ -253,12 +261,13 @@ public class LocalJobRunner implements ClientProtocol {
* @param mapOutputFiles a mapping from task attempts to output files
* @return a List of Runnables, one per map task.
*/
protected List<MapTaskRunnable> getMapTaskRunnables(
protected List<RunnableWithThrowable> getMapTaskRunnables(
TaskSplitMetaInfo [] taskInfo, JobID jobId,
Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
int numTasks = 0;
ArrayList<MapTaskRunnable> list = new ArrayList<MapTaskRunnable>();
ArrayList<RunnableWithThrowable> list =
new ArrayList<RunnableWithThrowable>();
for (TaskSplitMetaInfo task : taskInfo) {
list.add(new MapTaskRunnable(task, numTasks++, jobId,
mapOutputFiles));
@ -267,12 +276,89 @@ public class LocalJobRunner implements ClientProtocol {
return list;
}
protected class ReduceTaskRunnable extends RunnableWithThrowable {
private final int taskId;
private final JobID jobId;
private final JobConf localConf;
// This is a reference to a shared object passed in by the
// external context; this delivers state to the reducers regarding
// where to fetch mapper outputs.
private final Map<TaskAttemptID, MapOutputFile> mapOutputFiles;
public ReduceTaskRunnable(int taskId, JobID jobId,
Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
this.taskId = taskId;
this.jobId = jobId;
this.mapOutputFiles = mapOutputFiles;
this.localConf = new JobConf(job);
this.localConf.set("mapreduce.jobtracker.address", "local");
}
public void run() {
try {
TaskAttemptID reduceId = new TaskAttemptID(new TaskID(
jobId, TaskType.REDUCE, taskId), 0);
LOG.info("Starting task: " + reduceId);
ReduceTask reduce = new ReduceTask(systemJobFile.toString(),
reduceId, taskId, mapIds.size(), 1);
reduce.setUser(UserGroupInformation.getCurrentUser().
getShortUserName());
setupChildMapredLocalDirs(localJobDir, reduce, localConf);
reduce.setLocalMapFiles(mapOutputFiles);
if (!Job.this.isInterrupted()) {
reduce.setJobFile(localJobFile.toString());
localConf.setUser(reduce.getUser());
reduce.localizeConfiguration(localConf);
reduce.setConf(localConf);
try {
reduce_tasks.getAndIncrement();
myMetrics.launchReduce(reduce.getTaskID());
reduce.run(localConf, Job.this);
myMetrics.completeReduce(reduce.getTaskID());
} finally {
reduce_tasks.getAndDecrement();
}
LOG.info("Finishing task: " + reduceId);
} else {
throw new InterruptedException();
}
} catch (Throwable t) {
// store this to be rethrown in the initial thread context.
this.storedException = t;
}
}
}
/**
* Create Runnables to encapsulate reduce tasks for use by the executor
* service.
* @param jobId the job id
* @param mapOutputFiles a mapping from task attempts to output files
* @return a List of Runnables, one per reduce task.
*/
protected List<RunnableWithThrowable> getReduceTaskRunnables(
JobID jobId, Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
int taskId = 0;
ArrayList<RunnableWithThrowable> list =
new ArrayList<RunnableWithThrowable>();
for (int i = 0; i < this.numReduceTasks; i++) {
list.add(new ReduceTaskRunnable(taskId++, jobId, mapOutputFiles));
}
return list;
}
/**
* Initialize the counters that will hold partial-progress from
* the various task attempts.
* @param numMaps the number of map tasks in this job.
*/
private synchronized void initCounters(int numMaps) {
private synchronized void initCounters(int numMaps, int numReduces) {
// Initialize state trackers for all map tasks.
this.partialMapProgress = new float[numMaps];
this.mapCounters = new Counters[numMaps];
@ -280,16 +366,22 @@ public class LocalJobRunner implements ClientProtocol {
this.mapCounters[i] = new Counters();
}
this.reduceCounters = new Counters();
this.partialReduceProgress = new float[numReduces];
this.reduceCounters = new Counters[numReduces];
for (int i = 0; i < numReduces; i++) {
this.reduceCounters[i] = new Counters();
}
this.numMapTasks = numMaps;
this.numReduceTasks = numReduces;
}
/**
* Creates the executor service used to run map tasks.
*
* @param numMapTasks the total number of map tasks to be run
* @return an ExecutorService instance that handles map tasks
*/
protected ExecutorService createMapExecutor(int numMapTasks) {
protected synchronized ExecutorService createMapExecutor() {
// Determine the size of the thread pool to use
int maxMapThreads = job.getInt(LOCAL_MAX_MAPS, 1);
@ -297,13 +389,10 @@ public class LocalJobRunner implements ClientProtocol {
throw new IllegalArgumentException(
"Configured " + LOCAL_MAX_MAPS + " must be >= 1");
}
this.numMapTasks = numMapTasks;
maxMapThreads = Math.min(maxMapThreads, this.numMapTasks);
maxMapThreads = Math.max(maxMapThreads, 1); // In case of no tasks.
initCounters(this.numMapTasks);
LOG.debug("Starting thread pool executor.");
LOG.debug("Starting mapper thread pool executor.");
LOG.debug("Max local threads: " + maxMapThreads);
LOG.debug("Map tasks to process: " + this.numMapTasks);
@ -316,6 +405,65 @@ public class LocalJobRunner implements ClientProtocol {
return executor;
}
/**
* Creates the executor service used to run reduce tasks.
*
* @return an ExecutorService instance that handles reduce tasks
*/
protected synchronized ExecutorService createReduceExecutor() {
// Determine the size of the thread pool to use
int maxReduceThreads = job.getInt(LOCAL_MAX_REDUCES, 1);
if (maxReduceThreads < 1) {
throw new IllegalArgumentException(
"Configured " + LOCAL_MAX_REDUCES + " must be >= 1");
}
maxReduceThreads = Math.min(maxReduceThreads, this.numReduceTasks);
maxReduceThreads = Math.max(maxReduceThreads, 1); // In case of no tasks.
LOG.debug("Starting reduce thread pool executor.");
LOG.debug("Max local threads: " + maxReduceThreads);
LOG.debug("Reduce tasks to process: " + this.numReduceTasks);
// Create a new executor service to drain the work queue.
ExecutorService executor = Executors.newFixedThreadPool(maxReduceThreads);
return executor;
}
/** Run a set of tasks and waits for them to complete. */
private void runTasks(List<RunnableWithThrowable> runnables,
ExecutorService service, String taskType) throws Exception {
// Start populating the executor with work units.
// They may begin running immediately (in other threads).
for (Runnable r : runnables) {
service.submit(r);
}
try {
service.shutdown(); // Instructs queue to drain.
// Wait for tasks to finish; do not use a time-based timeout.
// (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
LOG.info("Waiting for " + taskType + " tasks");
service.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException ie) {
// Cancel all threads.
service.shutdownNow();
throw ie;
}
LOG.info(taskType + " task executor complete.");
// After waiting for the tasks to complete, if any of these
// have thrown an exception, rethrow it now in the main thread context.
for (RunnableWithThrowable r : runnables) {
if (r.storedException != null) {
throw new Exception(r.storedException);
}
}
}
private org.apache.hadoop.mapreduce.OutputCommitter
createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws Exception {
org.apache.hadoop.mapreduce.OutputCommitter committer = null;
@ -360,94 +508,25 @@ public class LocalJobRunner implements ClientProtocol {
SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
int numReduceTasks = job.getNumReduceTasks();
if (numReduceTasks > 1 || numReduceTasks < 0) {
// we only allow 0 or 1 reducer in local mode
numReduceTasks = 1;
job.setNumReduceTasks(1);
}
outputCommitter.setupJob(jContext);
status.setSetupProgress(1.0f);
Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos,
jobId, mapOutputFiles);
ExecutorService mapService = createMapExecutor(taskRunnables.size());
List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(
taskSplitMetaInfos, jobId, mapOutputFiles);
// Start populating the executor with work units.
// They may begin running immediately (in other threads).
for (Runnable r : taskRunnables) {
mapService.submit(r);
}
initCounters(mapRunnables.size(), numReduceTasks);
ExecutorService mapService = createMapExecutor();
runTasks(mapRunnables, mapService, "map");
try {
mapService.shutdown(); // Instructs queue to drain.
// Wait for tasks to finish; do not use a time-based timeout.
// (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
LOG.info("Waiting for map tasks");
mapService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException ie) {
// Cancel all threads.
mapService.shutdownNow();
throw ie;
}
LOG.info("Map task executor complete.");
// After waiting for the map tasks to complete, if any of these
// have thrown an exception, rethrow it now in the main thread context.
for (MapTaskRunnable r : taskRunnables) {
if (r.storedException != null) {
throw new Exception(r.storedException);
}
}
TaskAttemptID reduceId =
new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
try {
if (numReduceTasks > 0) {
ReduceTask reduce = new ReduceTask(systemJobFile.toString(),
reduceId, 0, mapIds.size(), 1);
reduce.setUser(UserGroupInformation.getCurrentUser().
getShortUserName());
JobConf localConf = new JobConf(job);
localConf.set("mapreduce.jobtracker.address", "local");
setupChildMapredLocalDirs(localJobDir, reduce, localConf);
// move map output to reduce input
for (int i = 0; i < mapIds.size(); i++) {
if (!this.isInterrupted()) {
TaskAttemptID mapId = mapIds.get(i);
Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
MapOutputFile localOutputFile = new MROutputFiles();
localOutputFile.setConf(localConf);
Path reduceIn =
localOutputFile.getInputFileForWrite(mapId.getTaskID(),
localFs.getFileStatus(mapOut).getLen());
if (!localFs.mkdirs(reduceIn.getParent())) {
throw new IOException("Mkdirs failed to create "
+ reduceIn.getParent().toString());
}
if (!localFs.rename(mapOut, reduceIn))
throw new IOException("Couldn't rename " + mapOut);
} else {
throw new InterruptedException();
}
}
if (!this.isInterrupted()) {
reduce.setJobFile(localJobFile.toString());
localConf.setUser(reduce.getUser());
reduce.localizeConfiguration(localConf);
reduce.setConf(localConf);
reduce_tasks += 1;
myMetrics.launchReduce(reduce.getTaskID());
reduce.run(localConf, this);
myMetrics.completeReduce(reduce.getTaskID());
reduce_tasks -= 1;
} else {
throw new InterruptedException();
}
List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(
jobId, mapOutputFiles);
ExecutorService reduceService = createReduceExecutor();
runTasks(reduceRunnables, reduceService, "reduce");
}
} finally {
for (MapOutputFile output : mapOutputFiles.values()) {
@ -465,7 +544,6 @@ public class LocalJobRunner implements ClientProtocol {
}
JobEndNotifier.localRunnerNotification(job, status);
} catch (Throwable t) {
try {
outputCommitter.abortJob(jContext,
@ -511,12 +589,13 @@ public class LocalJobRunner implements ClientProtocol {
new ByteArrayInputStream(baos.toByteArray())));
LOG.info(taskStatus.getStateString());
int taskIndex = mapIds.indexOf(taskId);
if (taskIndex >= 0) { // mapping
int mapTaskIndex = mapIds.indexOf(taskId);
if (mapTaskIndex >= 0) {
// mapping
float numTasks = (float) this.numMapTasks;
partialMapProgress[taskIndex] = taskStatus.getProgress();
mapCounters[taskIndex] = taskStatus.getCounters();
partialMapProgress[mapTaskIndex] = taskStatus.getProgress();
mapCounters[mapTaskIndex] = taskStatus.getCounters();
float partialProgress = 0.0f;
for (float f : partialMapProgress) {
@ -524,8 +603,18 @@ public class LocalJobRunner implements ClientProtocol {
}
status.setMapProgress(partialProgress / numTasks);
} else {
reduceCounters = taskStatus.getCounters();
status.setReduceProgress(taskStatus.getProgress());
// reducing
int reduceTaskIndex = taskId.getTaskID().getId();
float numTasks = (float) this.numReduceTasks;
partialReduceProgress[reduceTaskIndex] = taskStatus.getProgress();
reduceCounters[reduceTaskIndex] = taskStatus.getCounters();
float partialProgress = 0.0f;
for (float f : partialReduceProgress) {
partialProgress += f;
}
status.setReduceProgress(partialProgress / numTasks);
}
// ignore phase
@ -545,7 +634,13 @@ public class LocalJobRunner implements ClientProtocol {
for (Counters c : mapCounters) {
current = Counters.sum(current, c);
}
current = Counters.sum(current, reduceCounters);
if (null != reduceCounters && reduceCounters.length > 0) {
for (Counters c : reduceCounters) {
current = Counters.sum(current, c);
}
}
return current;
}
@ -684,8 +779,9 @@ public class LocalJobRunner implements ClientProtocol {
public ClusterMetrics getClusterMetrics() {
int numMapTasks = map_tasks.get();
return new ClusterMetrics(numMapTasks, reduce_tasks, numMapTasks,
reduce_tasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
int numReduceTasks = reduce_tasks.get();
return new ClusterMetrics(numMapTasks, numReduceTasks, numMapTasks,
numReduceTasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
}
public JobTrackerStatus getJobTrackerStatus() {
@ -816,6 +912,27 @@ public class LocalJobRunner implements ClientProtocol {
return job.getConfiguration().getInt(LOCAL_MAX_MAPS, 1);
}
/**
* Set the max number of reduce tasks to run concurrently in the LocalJobRunner.
* @param job the job to configure
* @param maxReduces the maximum number of reduce tasks to allow.
*/
public static void setLocalMaxRunningReduces(
org.apache.hadoop.mapreduce.JobContext job,
int maxReduces) {
job.getConfiguration().setInt(LOCAL_MAX_REDUCES, maxReduces);
}
/**
* @return the max number of reduce tasks to run concurrently in the
* LocalJobRunner.
*/
public static int getLocalMaxRunningReduces(
org.apache.hadoop.mapreduce.JobContext job) {
return job.getConfiguration().getInt(LOCAL_MAX_REDUCES, 1);
}
@Override
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
) throws IOException,

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
@ -1860,7 +1861,6 @@ public class MapTask extends Task {
}
{
sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
Merger.considerFinalMergeForProgress();
IndexRecord rec = new IndexRecord();
final SpillRecord spillRec = new SpillRecord(partitions);
@ -1893,7 +1893,8 @@ public class MapTask extends Task {
segmentList, mergeFactor,
new Path(mapId.toString()),
job.getOutputKeyComparator(), reporter, sortSegments,
null, spilledRecordsCounter, sortPhase.phase());
null, spilledRecordsCounter, sortPhase.phase(),
TaskType.MAP);
//write merged output to disk
long segmentStart = finalOut.getPos();

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.IFile.Reader;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.PriorityQueue;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
@ -69,7 +70,8 @@ public class Merger {
throws IOException {
return
new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
reporter, null).merge(keyClass, valueClass,
reporter, null,
TaskType.REDUCE).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
mergePhase);
@ -90,7 +92,8 @@ public class Merger {
throws IOException {
return
new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
reporter, mergedMapOutputsCounter).merge(
reporter, mergedMapOutputsCounter,
TaskType.REDUCE).merge(
keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
@ -124,7 +127,8 @@ public class Merger {
Progress mergePhase)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
sortSegments).merge(keyClass, valueClass,
sortSegments,
TaskType.REDUCE).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
mergePhase);
@ -140,10 +144,12 @@ public class Merger {
boolean sortSegments,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
Progress mergePhase)
Progress mergePhase,
TaskType taskType)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
sortSegments, codec).merge(keyClass, valueClass,
sortSegments, codec,
taskType).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
mergePhase);
@ -161,7 +167,8 @@ public class Merger {
Progress mergePhase)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
sortSegments).merge(keyClass, valueClass,
sortSegments,
TaskType.REDUCE).merge(keyClass, valueClass,
mergeFactor, inMemSegments,
tmpDir,
readsCounter, writesCounter,
@ -182,7 +189,8 @@ public class Merger {
Progress mergePhase)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
sortSegments, codec).merge(keyClass, valueClass,
sortSegments, codec,
TaskType.REDUCE).merge(keyClass, valueClass,
mergeFactor, inMemSegments,
tmpDir,
readsCounter, writesCounter,
@ -367,19 +375,6 @@ public class Merger {
}
}
// Boolean variable for including/considering final merge as part of sort
// phase or not. This is true in map task, false in reduce task. It is
// used in calculating mergeProgress.
static boolean includeFinalMerge = false;
/**
* Sets the boolean variable includeFinalMerge to true. Called from
* map task before calling merge() so that final merge of map task
* is also considered as part of sort phase.
*/
static void considerFinalMergeForProgress() {
includeFinalMerge = true;
}
private static class MergeQueue<K extends Object, V extends Object>
extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {
@ -401,6 +396,21 @@ public class Merger {
final DataInputBuffer value = new DataInputBuffer();
final DataInputBuffer diskIFileValue = new DataInputBuffer();
// Boolean variable for including/considering final merge as part of sort
// phase or not. This is true in map task, false in reduce task. It is
// used in calculating mergeProgress.
private boolean includeFinalMerge = false;
/**
* Sets the boolean variable includeFinalMerge to true. Called from
* map task before calling merge() so that final merge of map task
* is also considered as part of sort phase.
*/
private void considerFinalMergeForProgress() {
includeFinalMerge = true;
}
Segment<K, V> minSegment;
Comparator<Segment<K, V>> segmentComparator =
new Comparator<Segment<K, V>>() {
@ -419,14 +429,16 @@ public class Merger {
CompressionCodec codec, RawComparator<K> comparator,
Progressable reporter)
throws IOException {
this(conf, fs, inputs, deleteInputs, codec, comparator, reporter, null);
this(conf, fs, inputs, deleteInputs, codec, comparator, reporter, null,
TaskType.REDUCE);
}
public MergeQueue(Configuration conf, FileSystem fs,
Path[] inputs, boolean deleteInputs,
CompressionCodec codec, RawComparator<K> comparator,
Progressable reporter,
Counters.Counter mergedMapOutputsCounter)
Counters.Counter mergedMapOutputsCounter,
TaskType taskType)
throws IOException {
this.conf = conf;
this.fs = fs;
@ -434,6 +446,10 @@ public class Merger {
this.comparator = comparator;
this.reporter = reporter;
if (taskType == TaskType.MAP) {
considerFinalMergeForProgress();
}
for (Path file : inputs) {
LOG.debug("MergeQ: adding: " + file);
segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs,
@ -449,17 +465,20 @@ public class Merger {
public MergeQueue(Configuration conf, FileSystem fs,
List<Segment<K, V>> segments, RawComparator<K> comparator,
Progressable reporter) {
this(conf, fs, segments, comparator, reporter, false);
this(conf, fs, segments, comparator, reporter, false, TaskType.REDUCE);
}
public MergeQueue(Configuration conf, FileSystem fs,
List<Segment<K, V>> segments, RawComparator<K> comparator,
Progressable reporter, boolean sortSegments) {
Progressable reporter, boolean sortSegments, TaskType taskType) {
this.conf = conf;
this.fs = fs;
this.comparator = comparator;
this.segments = segments;
this.reporter = reporter;
if (taskType == TaskType.MAP) {
considerFinalMergeForProgress();
}
if (sortSegments) {
Collections.sort(segments, segmentComparator);
}
@ -467,8 +486,10 @@ public class Merger {
public MergeQueue(Configuration conf, FileSystem fs,
List<Segment<K, V>> segments, RawComparator<K> comparator,
Progressable reporter, boolean sortSegments, CompressionCodec codec) {
this(conf, fs, segments, comparator, reporter, sortSegments);
Progressable reporter, boolean sortSegments, CompressionCodec codec,
TaskType taskType) {
this(conf, fs, segments, comparator, reporter, sortSegments,
taskType);
this.codec = codec;
}

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
@ -74,6 +75,10 @@ public class ReduceTask extends Task {
private CompressionCodec codec;
// If this is a LocalJobRunner-based job, this will
// be a mapping from map task attempts to their output files.
// This will be null in other cases.
private Map<TaskAttemptID, MapOutputFile> localMapFiles;
{
getProgress().setStatus("reduce");
@ -105,23 +110,23 @@ public class ReduceTask extends Task {
// file paths, the first parameter is considered smaller than the second one.
// In case of files with same size and path are considered equal.
private Comparator<FileStatus> mapOutputFileComparator =
new Comparator<FileStatus>() {
public int compare(FileStatus a, FileStatus b) {
if (a.getLen() < b.getLen())
return -1;
else if (a.getLen() == b.getLen())
if (a.getPath().toString().equals(b.getPath().toString()))
return 0;
else
return -1;
new Comparator<FileStatus>() {
public int compare(FileStatus a, FileStatus b) {
if (a.getLen() < b.getLen())
return -1;
else if (a.getLen() == b.getLen())
if (a.getPath().toString().equals(b.getPath().toString()))
return 0;
else
return 1;
}
return -1;
else
return 1;
}
};
// A sorted set for keeping a set of map output files on disk
private final SortedSet<FileStatus> mapOutputFilesOnDisk =
new TreeSet<FileStatus>(mapOutputFileComparator);
new TreeSet<FileStatus>(mapOutputFileComparator);
public ReduceTask() {
super();
@ -133,6 +138,17 @@ public class ReduceTask extends Task {
this.numMaps = numMaps;
}
/**
* Register the set of mapper outputs created by a LocalJobRunner-based
* job with this ReduceTask so it knows where to fetch from.
*
* This should not be called in normal (networked) execution.
*/
public void setLocalMapFiles(Map<TaskAttemptID, MapOutputFile> mapFiles) {
this.localMapFiles = mapFiles;
}
private CompressionCodec initCodec() {
// check if map-outputs are to be compressed
if (conf.getCompressMapOutput()) {
@ -174,20 +190,11 @@ public class ReduceTask extends Task {
numMaps = in.readInt();
}
// Get the input files for the reducer.
private Path[] getMapFiles(FileSystem fs, boolean isLocal)
throws IOException {
// Get the input files for the reducer (for local jobs).
private Path[] getMapFiles(FileSystem fs) throws IOException {
List<Path> fileList = new ArrayList<Path>();
if (isLocal) {
// for local jobs
for(int i = 0; i < numMaps; ++i) {
fileList.add(mapOutputFile.getInputFile(i));
}
} else {
// for non local jobs
for (FileStatus filestatus : mapOutputFilesOnDisk) {
fileList.add(filestatus.getPath());
}
for(int i = 0; i < numMaps; ++i) {
fileList.add(mapOutputFile.getInputFile(i));
}
return fileList.toArray(new Path[0]);
}
@ -343,54 +350,31 @@ public class ReduceTask extends Task {
RawKeyValueIterator rIter = null;
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
boolean isLocal = false;
// local if
// 1) framework == local or
// 2) framework == null and job tracker address == local
String framework = job.get(MRConfig.FRAMEWORK_NAME);
String masterAddr = job.get(MRConfig.MASTER_ADDRESS, "local");
if ((framework == null && masterAddr.equals("local"))
|| (framework != null && framework.equals(MRConfig.LOCAL_FRAMEWORK_NAME))) {
isLocal = true;
}
Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
if (!isLocal) {
Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
Class<? extends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
Class<? extends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
shuffleConsumerPlugin.init(shuffleContext);
rIter = shuffleConsumerPlugin.run();
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile);
shuffleConsumerPlugin.init(shuffleContext);
rIter = shuffleConsumerPlugin.run();
} else {
// local job runner doesn't have a copy phase
copyPhase.complete();
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
rIter = Merger.merge(job, rfs, job.getMapOutputKeyClass(),
job.getMapOutputValueClass(), codec,
getMapFiles(rfs, true),
!conf.getKeepFailedTaskFiles(),
job.getInt(JobContext.IO_SORT_FACTOR, 100),
new Path(getTaskID().toString()),
job.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null, null);
}
// free up the data structures
mapOutputFilesOnDisk.clear();
@ -409,9 +393,7 @@ public class ReduceTask extends Task {
keyClass, valueClass);
}
if (shuffleConsumerPlugin != null) {
shuffleConsumerPlugin.close();
}
shuffleConsumerPlugin.close();
done(umbilical, reporter);
}

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
@ -65,6 +67,7 @@ public interface ShuffleConsumerPlugin<K, V> {
private final Progress mergePhase;
private final Task reduceTask;
private final MapOutputFile mapOutputFile;
private final Map<TaskAttemptID, MapOutputFile> localMapFiles;
public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId,
JobConf jobConf, FileSystem localFS,
@ -80,7 +83,8 @@ public interface ShuffleConsumerPlugin<K, V> {
Counters.Counter failedShuffleCounter,
Counters.Counter mergedMapOutputsCounter,
TaskStatus status, Progress copyPhase, Progress mergePhase,
Task reduceTask, MapOutputFile mapOutputFile) {
Task reduceTask, MapOutputFile mapOutputFile,
Map<TaskAttemptID, MapOutputFile> localMapFiles) {
this.reduceId = reduceId;
this.jobConf = jobConf;
this.localFS = localFS;
@ -101,6 +105,7 @@ public interface ShuffleConsumerPlugin<K, V> {
this.mergePhase = mergePhase;
this.reduceTask = reduceTask;
this.mapOutputFile = mapOutputFile;
this.localMapFiles = localMapFiles;
}
public org.apache.hadoop.mapreduce.TaskAttemptID getReduceId() {
@ -163,6 +168,9 @@ public interface ShuffleConsumerPlugin<K, V> {
public MapOutputFile getMapOutputFile() {
return mapOutputFile;
}
public Map<TaskAttemptID, MapOutputFile> getLocalMapFiles() {
return localMapFiles;
}
} // end of public static class Context<K,V>
}

View File

@ -60,7 +60,7 @@ class Fetcher<K,V> extends Thread {
/* Default read timeout (in milliseconds) */
private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
private final Reporter reporter;
protected final Reporter reporter;
private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
CONNECTION, WRONG_REDUCE}
@ -71,13 +71,13 @@ class Fetcher<K,V> extends Thread {
private final Counters.Counter badIdErrs;
private final Counters.Counter wrongMapErrs;
private final Counters.Counter wrongReduceErrs;
private final MergeManager<K,V> merger;
private final ShuffleSchedulerImpl<K,V> scheduler;
private final ShuffleClientMetrics metrics;
private final ExceptionReporter exceptionReporter;
private final int id;
protected final MergeManager<K,V> merger;
protected final ShuffleSchedulerImpl<K,V> scheduler;
protected final ShuffleClientMetrics metrics;
protected final ExceptionReporter exceptionReporter;
protected final int id;
private static int nextId = 0;
private final int reduce;
protected final int reduce;
private final int connectionTimeout;
private final int readTimeout;

View File

@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.task.reduce;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.IndexRecord;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SpillRecord;
import org.apache.hadoop.mapreduce.TaskAttemptID;
/**
* LocalFetcher is used by LocalJobRunner to perform a local filesystem
* fetch.
*/
class LocalFetcher<K,V> extends Fetcher<K, V> {
private static final Log LOG = LogFactory.getLog(LocalFetcher.class);
private static final MapHost LOCALHOST = new MapHost("local", "local");
private JobConf job;
private Map<TaskAttemptID, MapOutputFile> localMapFiles;
public LocalFetcher(JobConf job, TaskAttemptID reduceId,
ShuffleSchedulerImpl<K, V> scheduler,
MergeManager<K,V> merger,
Reporter reporter, ShuffleClientMetrics metrics,
ExceptionReporter exceptionReporter,
SecretKey shuffleKey,
Map<TaskAttemptID, MapOutputFile> localMapFiles) {
super(job, reduceId, scheduler, merger, reporter, metrics,
exceptionReporter, shuffleKey);
this.job = job;
this.localMapFiles = localMapFiles;
setName("localfetcher#" + id);
setDaemon(true);
}
public void run() {
// Create a worklist of task attempts to work over.
Set<TaskAttemptID> maps = new HashSet<TaskAttemptID>();
for (TaskAttemptID map : localMapFiles.keySet()) {
maps.add(map);
}
while (maps.size() > 0) {
try {
// If merge is on, block
merger.waitForResource();
metrics.threadBusy();
// Copy as much as is possible.
doCopy(maps);
metrics.threadFree();
} catch (InterruptedException ie) {
} catch (Throwable t) {
exceptionReporter.reportException(t);
}
}
}
/**
* The crux of the matter...
*/
private void doCopy(Set<TaskAttemptID> maps) throws IOException {
Iterator<TaskAttemptID> iter = maps.iterator();
while (iter.hasNext()) {
TaskAttemptID map = iter.next();
LOG.debug("LocalFetcher " + id + " going to fetch: " + map);
if (copyMapOutput(map)) {
// Successful copy. Remove this from our worklist.
iter.remove();
} else {
// We got back a WAIT command; go back to the outer loop
// and block for InMemoryMerge.
break;
}
}
}
/**
* Retrieve the map output of a single map task
* and send it to the merger.
*/
private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
// Figure out where the map task stored its output.
Path mapOutputFileName = localMapFiles.get(mapTaskId).getOutputFile();
Path indexFileName = mapOutputFileName.suffix(".index");
// Read its index to determine the location of our split
// and its size.
SpillRecord sr = new SpillRecord(indexFileName, job);
IndexRecord ir = sr.getIndex(reduce);
long compressedLength = ir.partLength;
long decompressedLength = ir.rawLength;
// Get the location for the map output - either in-memory or on-disk
MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength,
id);
// Check if we can shuffle *now* ...
if (mapOutput == null) {
LOG.info("fetcher#" + id + " - MergeManager returned Status.WAIT ...");
return false;
}
// Go!
LOG.info("localfetcher#" + id + " about to shuffle output of map " +
mapOutput.getMapId() + " decomp: " +
decompressedLength + " len: " + compressedLength + " to " +
mapOutput.getDescription());
// now read the file, seek to the appropriate section, and send it.
FileSystem localFs = FileSystem.getLocal(job).getRaw();
FSDataInputStream inStream = localFs.open(mapOutputFileName);
try {
inStream.seek(ir.startOffset);
mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter);
} finally {
try {
inStream.close();
} catch (IOException ioe) {
LOG.warn("IOException closing inputstream from map output: "
+ ioe.toString());
}
}
scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0,
mapOutput);
return true; // successful fetch.
}
}

View File

@ -18,10 +18,12 @@
package org.apache.hadoop.mapreduce.task.reduce;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task;
@ -56,6 +58,7 @@ public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionRepo
private Progress copyPhase;
private TaskStatus taskStatus;
private Task reduceTask; //Used for status updates
private Map<TaskAttemptID, MapOutputFile> localMapFiles;
@Override
public void init(ShuffleConsumerPlugin.Context context) {
@ -69,6 +72,7 @@ public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionRepo
this.copyPhase = context.getCopyPhase();
this.taskStatus = context.getStatus();
this.reduceTask = context.getReduceTask();
this.localMapFiles = context.getLocalMapFiles();
scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
this, copyPhase, context.getShuffledMapsCounter(),
@ -103,13 +107,22 @@ public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionRepo
eventFetcher.start();
// Start the map-output fetcher threads
final int numFetchers = jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
if (isLocal) {
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
}
}
// Wait for shuffle to complete successfully

View File

@ -155,7 +155,7 @@ public class TestShufflePlugin<K, V> {
mockCounter, mockCounter, mockCounter,
mockCounter, mockCounter, mockCounter,
mockTaskStatus, mockProgress, mockProgress,
mockTask, mockMapOutputFile);
mockTask, mockMapOutputFile, null);
shuffleConsumerPlugin.init(context);
shuffleConsumerPlugin.run();
shuffleConsumerPlugin.close();

View File

@ -276,18 +276,16 @@ public class TestJobCounters {
// there are too few spills to combine (2 < 3)
// Each map spills 2^14 records, so maps spill 49152 records, combined.
// The reduce spill count is composed of the read from one segment and
// the intermediate merge of the other two. The intermediate merge
// The combiner has emitted 24576 records to the reducer; these are all
// fetched straight to memory from the map side. The intermediate merge
// adds 8192 records per segment read; again, there are too few spills to
// combine, so all 16834 are written to disk (total 32768 spilled records
// for the intermediate merge). The merge into the reduce includes only
// the unmerged segment, size 8192. Total spilled records in the reduce
// is 32768 from the merge + 8192 unmerged segment = 40960 records
// combine, so all Total spilled records in the reduce
// is 8192 records / map * 3 maps = 24576.
// Total: map + reduce = 49152 + 40960 = 90112
// Total: map + reduce = 49152 + 24576 = 73728
// 3 files, 5120 = 5 * 1024 rec/file = 15360 input records
// 4 records/line = 61440 output records
validateCounters(c1, 90112, 15360, 61440);
validateCounters(c1, 73728, 15360, 61440);
validateFileCounters(c1, inputSize, 0, 0, 0);
validateOldFileCounters(c1, inputSize, 61928, 0, 0);
}
@ -316,12 +314,12 @@ public class TestJobCounters {
// 1st merge: read + write = 8192 * 4
// 2nd merge: read + write = 8192 * 4
// final merge: 0
// Total reduce: 65536
// Total reduce: 32768
// Total: map + reduce = 2^16 + 2^16 = 131072
// Total: map + reduce = 2^16 + 2^15 = 98304
// 4 files, 5120 = 5 * 1024 rec/file = 15360 input records
// 4 records/line = 81920 output records
validateCounters(c1, 131072, 20480, 81920);
validateCounters(c1, 98304, 20480, 81920);
validateFileCounters(c1, inputSize, 0, 0, 0);
}
@ -349,7 +347,7 @@ public class TestJobCounters {
// Total reduce: 45056
// 5 files, 5120 = 5 * 1024 rec/file = 15360 input records
// 4 records/line = 102400 output records
validateCounters(c1, 147456, 25600, 102400);
validateCounters(c1, 122880, 25600, 102400);
validateFileCounters(c1, inputSize, 0, 0, 0);
}
@ -394,7 +392,7 @@ public class TestJobCounters {
job, new Path(OUT_DIR, "outputN0"));
assertTrue(job.waitForCompletion(true));
final Counters c1 = Counters.downgrade(job.getCounters());
validateCounters(c1, 90112, 15360, 61440);
validateCounters(c1, 73728, 15360, 61440);
validateFileCounters(c1, inputSize, 0, 0, 0);
}
@ -416,7 +414,7 @@ public class TestJobCounters {
job, new Path(OUT_DIR, "outputN1"));
assertTrue(job.waitForCompletion(true));
final Counters c1 = Counters.downgrade(job.getCounters());
validateCounters(c1, 131072, 20480, 81920);
validateCounters(c1, 98304, 20480, 81920);
validateFileCounters(c1, inputSize, 0, 0, 0);
}
@ -439,7 +437,7 @@ public class TestJobCounters {
job, new Path(OUT_DIR, "outputN2"));
assertTrue(job.waitForCompletion(true));
final Counters c1 = Counters.downgrade(job.getCounters());
validateCounters(c1, 147456, 25600, 102400);
validateCounters(c1, 122880, 25600, 102400);
validateFileCounters(c1, inputSize, 0, 0, 0);
}

View File

@ -63,7 +63,7 @@ public class TestKeyFieldBasedComparator extends HadoopTestCase {
conf.setOutputValueClass(LongWritable.class);
conf.setNumMapTasks(1);
conf.setNumReduceTasks(2);
conf.setNumReduceTasks(1);
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
@ -101,9 +101,7 @@ public class TestKeyFieldBasedComparator extends HadoopTestCase {
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
//make sure we get what we expect as the first line, and also
//that we have two lines (both the lines must end up in the same
//reducer since the partitioner takes the same key spec for all
//lines
//that we have two lines
if (expect == 1) {
assertTrue(line.startsWith(line1));
} else if (expect == 2) {

View File

@ -31,9 +31,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapred.LocalJobRunner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@ -410,6 +410,7 @@ public class TestLocalRunner extends TestCase {
}
/** Test case for zero mappers */
@Test
public void testEmptyMaps() throws Exception {
Job job = Job.getInstance();
Path outputPath = getOutputPath();
@ -428,5 +429,145 @@ public class TestLocalRunner extends TestCase {
boolean success = job.waitForCompletion(true);
assertTrue("Empty job should work", success);
}
/** @return the directory where numberfiles are written (mapper inputs) */
private Path getNumberDirPath() {
return new Path(getInputPath(), "numberfiles");
}
/**
* Write out an input file containing an integer.
*
* @param fileNum the file number to write to.
* @param value the value to write to the file
* @return the path of the written file.
*/
private Path makeNumberFile(int fileNum, int value) throws IOException {
Path workDir = getNumberDirPath();
Path filePath = new Path(workDir, "file" + fileNum);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
OutputStream os = fs.create(filePath);
BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
w.write("" + value);
w.close();
return filePath;
}
/**
* Each record received by this mapper is a number 'n'.
* Emit the values [0..n-1]
*/
public static class SequenceMapper
extends Mapper<LongWritable, Text, Text, NullWritable> {
public void map(LongWritable k, Text v, Context c)
throws IOException, InterruptedException {
int max = Integer.valueOf(v.toString());
for (int i = 0; i < max; i++) {
c.write(new Text("" + i), NullWritable.get());
}
}
}
private final static int NUMBER_FILE_VAL = 100;
/**
* Tally up the values and ensure that we got as much data
* out as we put in.
* Each mapper generated 'NUMBER_FILE_VAL' values (0..NUMBER_FILE_VAL-1).
* Verify that across all our reducers we got exactly this much
* data back.
*/
private void verifyNumberJob(int numMaps) throws Exception {
Path outputDir = getOutputPath();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
FileStatus [] stats = fs.listStatus(outputDir);
int valueSum = 0;
for (FileStatus f : stats) {
FSDataInputStream istream = fs.open(f.getPath());
BufferedReader r = new BufferedReader(new InputStreamReader(istream));
String line = null;
while ((line = r.readLine()) != null) {
valueSum += Integer.valueOf(line.trim());
}
r.close();
}
int maxVal = NUMBER_FILE_VAL - 1;
int expectedPerMapper = maxVal * (maxVal + 1) / 2;
int expectedSum = expectedPerMapper * numMaps;
LOG.info("expected sum: " + expectedSum + ", got " + valueSum);
assertEquals("Didn't get all our results back", expectedSum, valueSum);
}
/**
* Run a test which creates a SequenceMapper / IdentityReducer
* job over a set of generated number files.
*/
private void doMultiReducerTest(int numMaps, int numReduces,
int parallelMaps, int parallelReduces) throws Exception {
Path in = getNumberDirPath();
Path out = getOutputPath();
// Clear data from any previous tests.
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
if (fs.exists(out)) {
fs.delete(out, true);
}
if (fs.exists(in)) {
fs.delete(in, true);
}
for (int i = 0; i < numMaps; i++) {
makeNumberFile(i, 100);
}
Job job = Job.getInstance();
job.setNumReduceTasks(numReduces);
job.setMapperClass(SequenceMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
LocalJobRunner.setLocalMaxRunningMaps(job, parallelMaps);
LocalJobRunner.setLocalMaxRunningReduces(job, parallelReduces);
boolean result = job.waitForCompletion(true);
assertTrue("Job failed!!", result);
verifyNumberJob(numMaps);
}
@Test
public void testOneMapMultiReduce() throws Exception {
doMultiReducerTest(1, 2, 1, 1);
}
@Test
public void testOneMapMultiParallelReduce() throws Exception {
doMultiReducerTest(1, 2, 1, 2);
}
@Test
public void testMultiMapOneReduce() throws Exception {
doMultiReducerTest(4, 1, 2, 1);
}
@Test
public void testMultiMapMultiReduce() throws Exception {
doMultiReducerTest(4, 4, 2, 2);
}
}

View File

@ -56,7 +56,7 @@ public class TestMRKeyFieldBasedComparator extends HadoopTestCase {
conf.set("mapreduce.partition.keypartitioner.options", "-k1.1,1.1");
conf.set(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " ");
Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 2,
Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1,
line1 +"\n" + line2 + "\n");
job.setMapperClass(InverseMapper.class);
job.setReducerClass(Reducer.class);