MAPREDUCE-434. LocalJobRunner limited to single reducer (Sandy Ryza and Aaron Kimball via Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1510866 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a00a729729
commit
0cb2fdc3b4
|
@ -157,6 +157,9 @@ Release 2.3.0 - UNRELEASED
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
|
MAPREDUCE-434. LocalJobRunner limited to single reducer (Sandy Ryza and
|
||||||
|
Aaron Kimball via Sandy Ryza)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus
|
MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus
|
||||||
|
|
|
@ -79,11 +79,15 @@ public class LocalJobRunner implements ClientProtocol {
|
||||||
public static final String LOCAL_MAX_MAPS =
|
public static final String LOCAL_MAX_MAPS =
|
||||||
"mapreduce.local.map.tasks.maximum";
|
"mapreduce.local.map.tasks.maximum";
|
||||||
|
|
||||||
|
/** The maximum number of reduce tasks to run in parallel in LocalJobRunner */
|
||||||
|
public static final String LOCAL_MAX_REDUCES =
|
||||||
|
"mapreduce.local.reduce.tasks.maximum";
|
||||||
|
|
||||||
private FileSystem fs;
|
private FileSystem fs;
|
||||||
private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
|
private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
|
||||||
private JobConf conf;
|
private JobConf conf;
|
||||||
private AtomicInteger map_tasks = new AtomicInteger(0);
|
private AtomicInteger map_tasks = new AtomicInteger(0);
|
||||||
private int reduce_tasks = 0;
|
private AtomicInteger reduce_tasks = new AtomicInteger(0);
|
||||||
final Random rand = new Random();
|
final Random rand = new Random();
|
||||||
|
|
||||||
private LocalJobRunnerMetrics myMetrics = null;
|
private LocalJobRunnerMetrics myMetrics = null;
|
||||||
|
@ -115,9 +119,11 @@ private class Job extends Thread implements TaskUmbilicalProtocol {
|
||||||
private JobConf job;
|
private JobConf job;
|
||||||
|
|
||||||
private int numMapTasks;
|
private int numMapTasks;
|
||||||
|
private int numReduceTasks;
|
||||||
private float [] partialMapProgress;
|
private float [] partialMapProgress;
|
||||||
|
private float [] partialReduceProgress;
|
||||||
private Counters [] mapCounters;
|
private Counters [] mapCounters;
|
||||||
private Counters reduceCounters;
|
private Counters [] reduceCounters;
|
||||||
|
|
||||||
private JobStatus status;
|
private JobStatus status;
|
||||||
private List<TaskAttemptID> mapIds = Collections.synchronizedList(
|
private List<TaskAttemptID> mapIds = Collections.synchronizedList(
|
||||||
|
@ -184,10 +190,14 @@ public Job(JobID jobid, String jobSubmitDir) throws IOException {
|
||||||
this.start();
|
this.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected abstract class RunnableWithThrowable implements Runnable {
|
||||||
|
public volatile Throwable storedException;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Runnable instance that handles a map task to be run by an executor.
|
* A Runnable instance that handles a map task to be run by an executor.
|
||||||
*/
|
*/
|
||||||
protected class MapTaskRunnable implements Runnable {
|
protected class MapTaskRunnable extends RunnableWithThrowable {
|
||||||
private final int taskId;
|
private final int taskId;
|
||||||
private final TaskSplitMetaInfo info;
|
private final TaskSplitMetaInfo info;
|
||||||
private final JobID jobId;
|
private final JobID jobId;
|
||||||
|
@ -198,8 +208,6 @@ protected class MapTaskRunnable implements Runnable {
|
||||||
// where to fetch mapper outputs.
|
// where to fetch mapper outputs.
|
||||||
private final Map<TaskAttemptID, MapOutputFile> mapOutputFiles;
|
private final Map<TaskAttemptID, MapOutputFile> mapOutputFiles;
|
||||||
|
|
||||||
public volatile Throwable storedException;
|
|
||||||
|
|
||||||
public MapTaskRunnable(TaskSplitMetaInfo info, int taskId, JobID jobId,
|
public MapTaskRunnable(TaskSplitMetaInfo info, int taskId, JobID jobId,
|
||||||
Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
|
Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
|
||||||
this.info = info;
|
this.info = info;
|
||||||
|
@ -253,12 +261,13 @@ public void run() {
|
||||||
* @param mapOutputFiles a mapping from task attempts to output files
|
* @param mapOutputFiles a mapping from task attempts to output files
|
||||||
* @return a List of Runnables, one per map task.
|
* @return a List of Runnables, one per map task.
|
||||||
*/
|
*/
|
||||||
protected List<MapTaskRunnable> getMapTaskRunnables(
|
protected List<RunnableWithThrowable> getMapTaskRunnables(
|
||||||
TaskSplitMetaInfo [] taskInfo, JobID jobId,
|
TaskSplitMetaInfo [] taskInfo, JobID jobId,
|
||||||
Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
|
Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
|
||||||
|
|
||||||
int numTasks = 0;
|
int numTasks = 0;
|
||||||
ArrayList<MapTaskRunnable> list = new ArrayList<MapTaskRunnable>();
|
ArrayList<RunnableWithThrowable> list =
|
||||||
|
new ArrayList<RunnableWithThrowable>();
|
||||||
for (TaskSplitMetaInfo task : taskInfo) {
|
for (TaskSplitMetaInfo task : taskInfo) {
|
||||||
list.add(new MapTaskRunnable(task, numTasks++, jobId,
|
list.add(new MapTaskRunnable(task, numTasks++, jobId,
|
||||||
mapOutputFiles));
|
mapOutputFiles));
|
||||||
|
@ -267,12 +276,89 @@ protected List<MapTaskRunnable> getMapTaskRunnables(
|
||||||
return list;
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected class ReduceTaskRunnable extends RunnableWithThrowable {
|
||||||
|
private final int taskId;
|
||||||
|
private final JobID jobId;
|
||||||
|
private final JobConf localConf;
|
||||||
|
|
||||||
|
// This is a reference to a shared object passed in by the
|
||||||
|
// external context; this delivers state to the reducers regarding
|
||||||
|
// where to fetch mapper outputs.
|
||||||
|
private final Map<TaskAttemptID, MapOutputFile> mapOutputFiles;
|
||||||
|
|
||||||
|
public ReduceTaskRunnable(int taskId, JobID jobId,
|
||||||
|
Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
|
||||||
|
this.taskId = taskId;
|
||||||
|
this.jobId = jobId;
|
||||||
|
this.mapOutputFiles = mapOutputFiles;
|
||||||
|
this.localConf = new JobConf(job);
|
||||||
|
this.localConf.set("mapreduce.jobtracker.address", "local");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
TaskAttemptID reduceId = new TaskAttemptID(new TaskID(
|
||||||
|
jobId, TaskType.REDUCE, taskId), 0);
|
||||||
|
LOG.info("Starting task: " + reduceId);
|
||||||
|
|
||||||
|
ReduceTask reduce = new ReduceTask(systemJobFile.toString(),
|
||||||
|
reduceId, taskId, mapIds.size(), 1);
|
||||||
|
reduce.setUser(UserGroupInformation.getCurrentUser().
|
||||||
|
getShortUserName());
|
||||||
|
setupChildMapredLocalDirs(localJobDir, reduce, localConf);
|
||||||
|
reduce.setLocalMapFiles(mapOutputFiles);
|
||||||
|
|
||||||
|
if (!Job.this.isInterrupted()) {
|
||||||
|
reduce.setJobFile(localJobFile.toString());
|
||||||
|
localConf.setUser(reduce.getUser());
|
||||||
|
reduce.localizeConfiguration(localConf);
|
||||||
|
reduce.setConf(localConf);
|
||||||
|
try {
|
||||||
|
reduce_tasks.getAndIncrement();
|
||||||
|
myMetrics.launchReduce(reduce.getTaskID());
|
||||||
|
reduce.run(localConf, Job.this);
|
||||||
|
myMetrics.completeReduce(reduce.getTaskID());
|
||||||
|
} finally {
|
||||||
|
reduce_tasks.getAndDecrement();
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Finishing task: " + reduceId);
|
||||||
|
} else {
|
||||||
|
throw new InterruptedException();
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
// store this to be rethrown in the initial thread context.
|
||||||
|
this.storedException = t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create Runnables to encapsulate reduce tasks for use by the executor
|
||||||
|
* service.
|
||||||
|
* @param jobId the job id
|
||||||
|
* @param mapOutputFiles a mapping from task attempts to output files
|
||||||
|
* @return a List of Runnables, one per reduce task.
|
||||||
|
*/
|
||||||
|
protected List<RunnableWithThrowable> getReduceTaskRunnables(
|
||||||
|
JobID jobId, Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
|
||||||
|
|
||||||
|
int taskId = 0;
|
||||||
|
ArrayList<RunnableWithThrowable> list =
|
||||||
|
new ArrayList<RunnableWithThrowable>();
|
||||||
|
for (int i = 0; i < this.numReduceTasks; i++) {
|
||||||
|
list.add(new ReduceTaskRunnable(taskId++, jobId, mapOutputFiles));
|
||||||
|
}
|
||||||
|
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize the counters that will hold partial-progress from
|
* Initialize the counters that will hold partial-progress from
|
||||||
* the various task attempts.
|
* the various task attempts.
|
||||||
* @param numMaps the number of map tasks in this job.
|
* @param numMaps the number of map tasks in this job.
|
||||||
*/
|
*/
|
||||||
private synchronized void initCounters(int numMaps) {
|
private synchronized void initCounters(int numMaps, int numReduces) {
|
||||||
// Initialize state trackers for all map tasks.
|
// Initialize state trackers for all map tasks.
|
||||||
this.partialMapProgress = new float[numMaps];
|
this.partialMapProgress = new float[numMaps];
|
||||||
this.mapCounters = new Counters[numMaps];
|
this.mapCounters = new Counters[numMaps];
|
||||||
|
@ -280,16 +366,22 @@ private synchronized void initCounters(int numMaps) {
|
||||||
this.mapCounters[i] = new Counters();
|
this.mapCounters[i] = new Counters();
|
||||||
}
|
}
|
||||||
|
|
||||||
this.reduceCounters = new Counters();
|
this.partialReduceProgress = new float[numReduces];
|
||||||
|
this.reduceCounters = new Counters[numReduces];
|
||||||
|
for (int i = 0; i < numReduces; i++) {
|
||||||
|
this.reduceCounters[i] = new Counters();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.numMapTasks = numMaps;
|
||||||
|
this.numReduceTasks = numReduces;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates the executor service used to run map tasks.
|
* Creates the executor service used to run map tasks.
|
||||||
*
|
*
|
||||||
* @param numMapTasks the total number of map tasks to be run
|
|
||||||
* @return an ExecutorService instance that handles map tasks
|
* @return an ExecutorService instance that handles map tasks
|
||||||
*/
|
*/
|
||||||
protected ExecutorService createMapExecutor(int numMapTasks) {
|
protected synchronized ExecutorService createMapExecutor() {
|
||||||
|
|
||||||
// Determine the size of the thread pool to use
|
// Determine the size of the thread pool to use
|
||||||
int maxMapThreads = job.getInt(LOCAL_MAX_MAPS, 1);
|
int maxMapThreads = job.getInt(LOCAL_MAX_MAPS, 1);
|
||||||
|
@ -297,13 +389,10 @@ protected ExecutorService createMapExecutor(int numMapTasks) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"Configured " + LOCAL_MAX_MAPS + " must be >= 1");
|
"Configured " + LOCAL_MAX_MAPS + " must be >= 1");
|
||||||
}
|
}
|
||||||
this.numMapTasks = numMapTasks;
|
|
||||||
maxMapThreads = Math.min(maxMapThreads, this.numMapTasks);
|
maxMapThreads = Math.min(maxMapThreads, this.numMapTasks);
|
||||||
maxMapThreads = Math.max(maxMapThreads, 1); // In case of no tasks.
|
maxMapThreads = Math.max(maxMapThreads, 1); // In case of no tasks.
|
||||||
|
|
||||||
initCounters(this.numMapTasks);
|
LOG.debug("Starting mapper thread pool executor.");
|
||||||
|
|
||||||
LOG.debug("Starting thread pool executor.");
|
|
||||||
LOG.debug("Max local threads: " + maxMapThreads);
|
LOG.debug("Max local threads: " + maxMapThreads);
|
||||||
LOG.debug("Map tasks to process: " + this.numMapTasks);
|
LOG.debug("Map tasks to process: " + this.numMapTasks);
|
||||||
|
|
||||||
|
@ -316,6 +405,65 @@ protected ExecutorService createMapExecutor(int numMapTasks) {
|
||||||
return executor;
|
return executor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the executor service used to run reduce tasks.
|
||||||
|
*
|
||||||
|
* @return an ExecutorService instance that handles reduce tasks
|
||||||
|
*/
|
||||||
|
protected synchronized ExecutorService createReduceExecutor() {
|
||||||
|
|
||||||
|
// Determine the size of the thread pool to use
|
||||||
|
int maxReduceThreads = job.getInt(LOCAL_MAX_REDUCES, 1);
|
||||||
|
if (maxReduceThreads < 1) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Configured " + LOCAL_MAX_REDUCES + " must be >= 1");
|
||||||
|
}
|
||||||
|
maxReduceThreads = Math.min(maxReduceThreads, this.numReduceTasks);
|
||||||
|
maxReduceThreads = Math.max(maxReduceThreads, 1); // In case of no tasks.
|
||||||
|
|
||||||
|
LOG.debug("Starting reduce thread pool executor.");
|
||||||
|
LOG.debug("Max local threads: " + maxReduceThreads);
|
||||||
|
LOG.debug("Reduce tasks to process: " + this.numReduceTasks);
|
||||||
|
|
||||||
|
// Create a new executor service to drain the work queue.
|
||||||
|
ExecutorService executor = Executors.newFixedThreadPool(maxReduceThreads);
|
||||||
|
|
||||||
|
return executor;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Run a set of tasks and waits for them to complete. */
|
||||||
|
private void runTasks(List<RunnableWithThrowable> runnables,
|
||||||
|
ExecutorService service, String taskType) throws Exception {
|
||||||
|
// Start populating the executor with work units.
|
||||||
|
// They may begin running immediately (in other threads).
|
||||||
|
for (Runnable r : runnables) {
|
||||||
|
service.submit(r);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
service.shutdown(); // Instructs queue to drain.
|
||||||
|
|
||||||
|
// Wait for tasks to finish; do not use a time-based timeout.
|
||||||
|
// (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
|
||||||
|
LOG.info("Waiting for " + taskType + " tasks");
|
||||||
|
service.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
// Cancel all threads.
|
||||||
|
service.shutdownNow();
|
||||||
|
throw ie;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info(taskType + " task executor complete.");
|
||||||
|
|
||||||
|
// After waiting for the tasks to complete, if any of these
|
||||||
|
// have thrown an exception, rethrow it now in the main thread context.
|
||||||
|
for (RunnableWithThrowable r : runnables) {
|
||||||
|
if (r.storedException != null) {
|
||||||
|
throw new Exception(r.storedException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private org.apache.hadoop.mapreduce.OutputCommitter
|
private org.apache.hadoop.mapreduce.OutputCommitter
|
||||||
createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws Exception {
|
createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws Exception {
|
||||||
org.apache.hadoop.mapreduce.OutputCommitter committer = null;
|
org.apache.hadoop.mapreduce.OutputCommitter committer = null;
|
||||||
|
@ -360,94 +508,25 @@ public void run() {
|
||||||
SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
|
SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
|
||||||
|
|
||||||
int numReduceTasks = job.getNumReduceTasks();
|
int numReduceTasks = job.getNumReduceTasks();
|
||||||
if (numReduceTasks > 1 || numReduceTasks < 0) {
|
|
||||||
// we only allow 0 or 1 reducer in local mode
|
|
||||||
numReduceTasks = 1;
|
|
||||||
job.setNumReduceTasks(1);
|
|
||||||
}
|
|
||||||
outputCommitter.setupJob(jContext);
|
outputCommitter.setupJob(jContext);
|
||||||
status.setSetupProgress(1.0f);
|
status.setSetupProgress(1.0f);
|
||||||
|
|
||||||
Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
|
Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
|
||||||
Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
|
Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
|
||||||
|
|
||||||
List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos,
|
List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(
|
||||||
jobId, mapOutputFiles);
|
taskSplitMetaInfos, jobId, mapOutputFiles);
|
||||||
ExecutorService mapService = createMapExecutor(taskRunnables.size());
|
|
||||||
|
|
||||||
// Start populating the executor with work units.
|
initCounters(mapRunnables.size(), numReduceTasks);
|
||||||
// They may begin running immediately (in other threads).
|
ExecutorService mapService = createMapExecutor();
|
||||||
for (Runnable r : taskRunnables) {
|
runTasks(mapRunnables, mapService, "map");
|
||||||
mapService.submit(r);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
mapService.shutdown(); // Instructs queue to drain.
|
|
||||||
|
|
||||||
// Wait for tasks to finish; do not use a time-based timeout.
|
|
||||||
// (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
|
|
||||||
LOG.info("Waiting for map tasks");
|
|
||||||
mapService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
|
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
// Cancel all threads.
|
|
||||||
mapService.shutdownNow();
|
|
||||||
throw ie;
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG.info("Map task executor complete.");
|
|
||||||
|
|
||||||
// After waiting for the map tasks to complete, if any of these
|
|
||||||
// have thrown an exception, rethrow it now in the main thread context.
|
|
||||||
for (MapTaskRunnable r : taskRunnables) {
|
|
||||||
if (r.storedException != null) {
|
|
||||||
throw new Exception(r.storedException);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
TaskAttemptID reduceId =
|
|
||||||
new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
|
|
||||||
try {
|
try {
|
||||||
if (numReduceTasks > 0) {
|
if (numReduceTasks > 0) {
|
||||||
ReduceTask reduce = new ReduceTask(systemJobFile.toString(),
|
List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(
|
||||||
reduceId, 0, mapIds.size(), 1);
|
jobId, mapOutputFiles);
|
||||||
reduce.setUser(UserGroupInformation.getCurrentUser().
|
ExecutorService reduceService = createReduceExecutor();
|
||||||
getShortUserName());
|
runTasks(reduceRunnables, reduceService, "reduce");
|
||||||
JobConf localConf = new JobConf(job);
|
|
||||||
localConf.set("mapreduce.jobtracker.address", "local");
|
|
||||||
setupChildMapredLocalDirs(localJobDir, reduce, localConf);
|
|
||||||
// move map output to reduce input
|
|
||||||
for (int i = 0; i < mapIds.size(); i++) {
|
|
||||||
if (!this.isInterrupted()) {
|
|
||||||
TaskAttemptID mapId = mapIds.get(i);
|
|
||||||
Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
|
|
||||||
MapOutputFile localOutputFile = new MROutputFiles();
|
|
||||||
localOutputFile.setConf(localConf);
|
|
||||||
Path reduceIn =
|
|
||||||
localOutputFile.getInputFileForWrite(mapId.getTaskID(),
|
|
||||||
localFs.getFileStatus(mapOut).getLen());
|
|
||||||
if (!localFs.mkdirs(reduceIn.getParent())) {
|
|
||||||
throw new IOException("Mkdirs failed to create "
|
|
||||||
+ reduceIn.getParent().toString());
|
|
||||||
}
|
|
||||||
if (!localFs.rename(mapOut, reduceIn))
|
|
||||||
throw new IOException("Couldn't rename " + mapOut);
|
|
||||||
} else {
|
|
||||||
throw new InterruptedException();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!this.isInterrupted()) {
|
|
||||||
reduce.setJobFile(localJobFile.toString());
|
|
||||||
localConf.setUser(reduce.getUser());
|
|
||||||
reduce.localizeConfiguration(localConf);
|
|
||||||
reduce.setConf(localConf);
|
|
||||||
reduce_tasks += 1;
|
|
||||||
myMetrics.launchReduce(reduce.getTaskID());
|
|
||||||
reduce.run(localConf, this);
|
|
||||||
myMetrics.completeReduce(reduce.getTaskID());
|
|
||||||
reduce_tasks -= 1;
|
|
||||||
} else {
|
|
||||||
throw new InterruptedException();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
for (MapOutputFile output : mapOutputFiles.values()) {
|
for (MapOutputFile output : mapOutputFiles.values()) {
|
||||||
|
@ -465,7 +544,6 @@ public void run() {
|
||||||
}
|
}
|
||||||
|
|
||||||
JobEndNotifier.localRunnerNotification(job, status);
|
JobEndNotifier.localRunnerNotification(job, status);
|
||||||
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
try {
|
try {
|
||||||
outputCommitter.abortJob(jContext,
|
outputCommitter.abortJob(jContext,
|
||||||
|
@ -511,12 +589,13 @@ public synchronized boolean statusUpdate(TaskAttemptID taskId,
|
||||||
new ByteArrayInputStream(baos.toByteArray())));
|
new ByteArrayInputStream(baos.toByteArray())));
|
||||||
|
|
||||||
LOG.info(taskStatus.getStateString());
|
LOG.info(taskStatus.getStateString());
|
||||||
int taskIndex = mapIds.indexOf(taskId);
|
int mapTaskIndex = mapIds.indexOf(taskId);
|
||||||
if (taskIndex >= 0) { // mapping
|
if (mapTaskIndex >= 0) {
|
||||||
|
// mapping
|
||||||
float numTasks = (float) this.numMapTasks;
|
float numTasks = (float) this.numMapTasks;
|
||||||
|
|
||||||
partialMapProgress[taskIndex] = taskStatus.getProgress();
|
partialMapProgress[mapTaskIndex] = taskStatus.getProgress();
|
||||||
mapCounters[taskIndex] = taskStatus.getCounters();
|
mapCounters[mapTaskIndex] = taskStatus.getCounters();
|
||||||
|
|
||||||
float partialProgress = 0.0f;
|
float partialProgress = 0.0f;
|
||||||
for (float f : partialMapProgress) {
|
for (float f : partialMapProgress) {
|
||||||
|
@ -524,8 +603,18 @@ public synchronized boolean statusUpdate(TaskAttemptID taskId,
|
||||||
}
|
}
|
||||||
status.setMapProgress(partialProgress / numTasks);
|
status.setMapProgress(partialProgress / numTasks);
|
||||||
} else {
|
} else {
|
||||||
reduceCounters = taskStatus.getCounters();
|
// reducing
|
||||||
status.setReduceProgress(taskStatus.getProgress());
|
int reduceTaskIndex = taskId.getTaskID().getId();
|
||||||
|
float numTasks = (float) this.numReduceTasks;
|
||||||
|
|
||||||
|
partialReduceProgress[reduceTaskIndex] = taskStatus.getProgress();
|
||||||
|
reduceCounters[reduceTaskIndex] = taskStatus.getCounters();
|
||||||
|
|
||||||
|
float partialProgress = 0.0f;
|
||||||
|
for (float f : partialReduceProgress) {
|
||||||
|
partialProgress += f;
|
||||||
|
}
|
||||||
|
status.setReduceProgress(partialProgress / numTasks);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ignore phase
|
// ignore phase
|
||||||
|
@ -545,7 +634,13 @@ public synchronized Counters getCurrentCounters() {
|
||||||
for (Counters c : mapCounters) {
|
for (Counters c : mapCounters) {
|
||||||
current = Counters.sum(current, c);
|
current = Counters.sum(current, c);
|
||||||
}
|
}
|
||||||
current = Counters.sum(current, reduceCounters);
|
|
||||||
|
if (null != reduceCounters && reduceCounters.length > 0) {
|
||||||
|
for (Counters c : reduceCounters) {
|
||||||
|
current = Counters.sum(current, c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return current;
|
return current;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -684,8 +779,9 @@ public String getFilesystemName() throws IOException {
|
||||||
|
|
||||||
public ClusterMetrics getClusterMetrics() {
|
public ClusterMetrics getClusterMetrics() {
|
||||||
int numMapTasks = map_tasks.get();
|
int numMapTasks = map_tasks.get();
|
||||||
return new ClusterMetrics(numMapTasks, reduce_tasks, numMapTasks,
|
int numReduceTasks = reduce_tasks.get();
|
||||||
reduce_tasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
|
return new ClusterMetrics(numMapTasks, numReduceTasks, numMapTasks,
|
||||||
|
numReduceTasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
public JobTrackerStatus getJobTrackerStatus() {
|
public JobTrackerStatus getJobTrackerStatus() {
|
||||||
|
@ -816,6 +912,27 @@ public static int getLocalMaxRunningMaps(
|
||||||
return job.getConfiguration().getInt(LOCAL_MAX_MAPS, 1);
|
return job.getConfiguration().getInt(LOCAL_MAX_MAPS, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the max number of reduce tasks to run concurrently in the LocalJobRunner.
|
||||||
|
* @param job the job to configure
|
||||||
|
* @param maxReduces the maximum number of reduce tasks to allow.
|
||||||
|
*/
|
||||||
|
public static void setLocalMaxRunningReduces(
|
||||||
|
org.apache.hadoop.mapreduce.JobContext job,
|
||||||
|
int maxReduces) {
|
||||||
|
job.getConfiguration().setInt(LOCAL_MAX_REDUCES, maxReduces);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the max number of reduce tasks to run concurrently in the
|
||||||
|
* LocalJobRunner.
|
||||||
|
*/
|
||||||
|
public static int getLocalMaxRunningReduces(
|
||||||
|
org.apache.hadoop.mapreduce.JobContext job) {
|
||||||
|
return job.getConfiguration().getInt(LOCAL_MAX_REDUCES, 1);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
|
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
|
||||||
) throws IOException,
|
) throws IOException,
|
||||||
|
|
|
@ -60,6 +60,7 @@
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
import org.apache.hadoop.mapreduce.TaskCounter;
|
import org.apache.hadoop.mapreduce.TaskCounter;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
|
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
|
||||||
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
|
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
|
||||||
|
@ -1860,7 +1861,6 @@ private void mergeParts() throws IOException, InterruptedException,
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
|
sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
|
||||||
Merger.considerFinalMergeForProgress();
|
|
||||||
|
|
||||||
IndexRecord rec = new IndexRecord();
|
IndexRecord rec = new IndexRecord();
|
||||||
final SpillRecord spillRec = new SpillRecord(partitions);
|
final SpillRecord spillRec = new SpillRecord(partitions);
|
||||||
|
@ -1893,7 +1893,8 @@ private void mergeParts() throws IOException, InterruptedException,
|
||||||
segmentList, mergeFactor,
|
segmentList, mergeFactor,
|
||||||
new Path(mapId.toString()),
|
new Path(mapId.toString()),
|
||||||
job.getOutputKeyComparator(), reporter, sortSegments,
|
job.getOutputKeyComparator(), reporter, sortSegments,
|
||||||
null, spilledRecordsCounter, sortPhase.phase());
|
null, spilledRecordsCounter, sortPhase.phase(),
|
||||||
|
TaskType.MAP);
|
||||||
|
|
||||||
//write merged output to disk
|
//write merged output to disk
|
||||||
long segmentStart = finalOut.getPos();
|
long segmentStart = finalOut.getPos();
|
||||||
|
|
|
@ -39,6 +39,7 @@
|
||||||
import org.apache.hadoop.mapred.IFile.Reader;
|
import org.apache.hadoop.mapred.IFile.Reader;
|
||||||
import org.apache.hadoop.mapred.IFile.Writer;
|
import org.apache.hadoop.mapred.IFile.Writer;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
import org.apache.hadoop.util.PriorityQueue;
|
import org.apache.hadoop.util.PriorityQueue;
|
||||||
import org.apache.hadoop.util.Progress;
|
import org.apache.hadoop.util.Progress;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
@ -69,7 +70,8 @@ RawKeyValueIterator merge(Configuration conf, FileSystem fs,
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return
|
return
|
||||||
new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
|
new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
|
||||||
reporter, null).merge(keyClass, valueClass,
|
reporter, null,
|
||||||
|
TaskType.REDUCE).merge(keyClass, valueClass,
|
||||||
mergeFactor, tmpDir,
|
mergeFactor, tmpDir,
|
||||||
readsCounter, writesCounter,
|
readsCounter, writesCounter,
|
||||||
mergePhase);
|
mergePhase);
|
||||||
|
@ -90,7 +92,8 @@ RawKeyValueIterator merge(Configuration conf, FileSystem fs,
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return
|
return
|
||||||
new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
|
new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
|
||||||
reporter, mergedMapOutputsCounter).merge(
|
reporter, mergedMapOutputsCounter,
|
||||||
|
TaskType.REDUCE).merge(
|
||||||
keyClass, valueClass,
|
keyClass, valueClass,
|
||||||
mergeFactor, tmpDir,
|
mergeFactor, tmpDir,
|
||||||
readsCounter, writesCounter,
|
readsCounter, writesCounter,
|
||||||
|
@ -124,7 +127,8 @@ RawKeyValueIterator merge(Configuration conf, FileSystem fs,
|
||||||
Progress mergePhase)
|
Progress mergePhase)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
|
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
|
||||||
sortSegments).merge(keyClass, valueClass,
|
sortSegments,
|
||||||
|
TaskType.REDUCE).merge(keyClass, valueClass,
|
||||||
mergeFactor, tmpDir,
|
mergeFactor, tmpDir,
|
||||||
readsCounter, writesCounter,
|
readsCounter, writesCounter,
|
||||||
mergePhase);
|
mergePhase);
|
||||||
|
@ -140,10 +144,12 @@ RawKeyValueIterator merge(Configuration conf, FileSystem fs,
|
||||||
boolean sortSegments,
|
boolean sortSegments,
|
||||||
Counters.Counter readsCounter,
|
Counters.Counter readsCounter,
|
||||||
Counters.Counter writesCounter,
|
Counters.Counter writesCounter,
|
||||||
Progress mergePhase)
|
Progress mergePhase,
|
||||||
|
TaskType taskType)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
|
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
|
||||||
sortSegments, codec).merge(keyClass, valueClass,
|
sortSegments, codec,
|
||||||
|
taskType).merge(keyClass, valueClass,
|
||||||
mergeFactor, tmpDir,
|
mergeFactor, tmpDir,
|
||||||
readsCounter, writesCounter,
|
readsCounter, writesCounter,
|
||||||
mergePhase);
|
mergePhase);
|
||||||
|
@ -161,7 +167,8 @@ RawKeyValueIterator merge(Configuration conf, FileSystem fs,
|
||||||
Progress mergePhase)
|
Progress mergePhase)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
|
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
|
||||||
sortSegments).merge(keyClass, valueClass,
|
sortSegments,
|
||||||
|
TaskType.REDUCE).merge(keyClass, valueClass,
|
||||||
mergeFactor, inMemSegments,
|
mergeFactor, inMemSegments,
|
||||||
tmpDir,
|
tmpDir,
|
||||||
readsCounter, writesCounter,
|
readsCounter, writesCounter,
|
||||||
|
@ -182,7 +189,8 @@ RawKeyValueIterator merge(Configuration conf, FileSystem fs,
|
||||||
Progress mergePhase)
|
Progress mergePhase)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
|
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
|
||||||
sortSegments, codec).merge(keyClass, valueClass,
|
sortSegments, codec,
|
||||||
|
TaskType.REDUCE).merge(keyClass, valueClass,
|
||||||
mergeFactor, inMemSegments,
|
mergeFactor, inMemSegments,
|
||||||
tmpDir,
|
tmpDir,
|
||||||
readsCounter, writesCounter,
|
readsCounter, writesCounter,
|
||||||
|
@ -367,19 +375,6 @@ void reinitReader(int offset) throws IOException {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Boolean variable for including/considering final merge as part of sort
|
|
||||||
// phase or not. This is true in map task, false in reduce task. It is
|
|
||||||
// used in calculating mergeProgress.
|
|
||||||
static boolean includeFinalMerge = false;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets the boolean variable includeFinalMerge to true. Called from
|
|
||||||
* map task before calling merge() so that final merge of map task
|
|
||||||
* is also considered as part of sort phase.
|
|
||||||
*/
|
|
||||||
static void considerFinalMergeForProgress() {
|
|
||||||
includeFinalMerge = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class MergeQueue<K extends Object, V extends Object>
|
private static class MergeQueue<K extends Object, V extends Object>
|
||||||
extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {
|
extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {
|
||||||
|
@ -401,6 +396,21 @@ private static class MergeQueue<K extends Object, V extends Object>
|
||||||
final DataInputBuffer value = new DataInputBuffer();
|
final DataInputBuffer value = new DataInputBuffer();
|
||||||
final DataInputBuffer diskIFileValue = new DataInputBuffer();
|
final DataInputBuffer diskIFileValue = new DataInputBuffer();
|
||||||
|
|
||||||
|
|
||||||
|
// Boolean variable for including/considering final merge as part of sort
|
||||||
|
// phase or not. This is true in map task, false in reduce task. It is
|
||||||
|
// used in calculating mergeProgress.
|
||||||
|
private boolean includeFinalMerge = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the boolean variable includeFinalMerge to true. Called from
|
||||||
|
* map task before calling merge() so that final merge of map task
|
||||||
|
* is also considered as part of sort phase.
|
||||||
|
*/
|
||||||
|
private void considerFinalMergeForProgress() {
|
||||||
|
includeFinalMerge = true;
|
||||||
|
}
|
||||||
|
|
||||||
Segment<K, V> minSegment;
|
Segment<K, V> minSegment;
|
||||||
Comparator<Segment<K, V>> segmentComparator =
|
Comparator<Segment<K, V>> segmentComparator =
|
||||||
new Comparator<Segment<K, V>>() {
|
new Comparator<Segment<K, V>>() {
|
||||||
|
@ -419,14 +429,16 @@ public MergeQueue(Configuration conf, FileSystem fs,
|
||||||
CompressionCodec codec, RawComparator<K> comparator,
|
CompressionCodec codec, RawComparator<K> comparator,
|
||||||
Progressable reporter)
|
Progressable reporter)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this(conf, fs, inputs, deleteInputs, codec, comparator, reporter, null);
|
this(conf, fs, inputs, deleteInputs, codec, comparator, reporter, null,
|
||||||
|
TaskType.REDUCE);
|
||||||
}
|
}
|
||||||
|
|
||||||
public MergeQueue(Configuration conf, FileSystem fs,
|
public MergeQueue(Configuration conf, FileSystem fs,
|
||||||
Path[] inputs, boolean deleteInputs,
|
Path[] inputs, boolean deleteInputs,
|
||||||
CompressionCodec codec, RawComparator<K> comparator,
|
CompressionCodec codec, RawComparator<K> comparator,
|
||||||
Progressable reporter,
|
Progressable reporter,
|
||||||
Counters.Counter mergedMapOutputsCounter)
|
Counters.Counter mergedMapOutputsCounter,
|
||||||
|
TaskType taskType)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
|
@ -434,6 +446,10 @@ public MergeQueue(Configuration conf, FileSystem fs,
|
||||||
this.comparator = comparator;
|
this.comparator = comparator;
|
||||||
this.reporter = reporter;
|
this.reporter = reporter;
|
||||||
|
|
||||||
|
if (taskType == TaskType.MAP) {
|
||||||
|
considerFinalMergeForProgress();
|
||||||
|
}
|
||||||
|
|
||||||
for (Path file : inputs) {
|
for (Path file : inputs) {
|
||||||
LOG.debug("MergeQ: adding: " + file);
|
LOG.debug("MergeQ: adding: " + file);
|
||||||
segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs,
|
segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs,
|
||||||
|
@ -449,17 +465,20 @@ public MergeQueue(Configuration conf, FileSystem fs,
|
||||||
public MergeQueue(Configuration conf, FileSystem fs,
|
public MergeQueue(Configuration conf, FileSystem fs,
|
||||||
List<Segment<K, V>> segments, RawComparator<K> comparator,
|
List<Segment<K, V>> segments, RawComparator<K> comparator,
|
||||||
Progressable reporter) {
|
Progressable reporter) {
|
||||||
this(conf, fs, segments, comparator, reporter, false);
|
this(conf, fs, segments, comparator, reporter, false, TaskType.REDUCE);
|
||||||
}
|
}
|
||||||
|
|
||||||
public MergeQueue(Configuration conf, FileSystem fs,
|
public MergeQueue(Configuration conf, FileSystem fs,
|
||||||
List<Segment<K, V>> segments, RawComparator<K> comparator,
|
List<Segment<K, V>> segments, RawComparator<K> comparator,
|
||||||
Progressable reporter, boolean sortSegments) {
|
Progressable reporter, boolean sortSegments, TaskType taskType) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
this.comparator = comparator;
|
this.comparator = comparator;
|
||||||
this.segments = segments;
|
this.segments = segments;
|
||||||
this.reporter = reporter;
|
this.reporter = reporter;
|
||||||
|
if (taskType == TaskType.MAP) {
|
||||||
|
considerFinalMergeForProgress();
|
||||||
|
}
|
||||||
if (sortSegments) {
|
if (sortSegments) {
|
||||||
Collections.sort(segments, segmentComparator);
|
Collections.sort(segments, segmentComparator);
|
||||||
}
|
}
|
||||||
|
@ -467,8 +486,10 @@ public MergeQueue(Configuration conf, FileSystem fs,
|
||||||
|
|
||||||
public MergeQueue(Configuration conf, FileSystem fs,
|
public MergeQueue(Configuration conf, FileSystem fs,
|
||||||
List<Segment<K, V>> segments, RawComparator<K> comparator,
|
List<Segment<K, V>> segments, RawComparator<K> comparator,
|
||||||
Progressable reporter, boolean sortSegments, CompressionCodec codec) {
|
Progressable reporter, boolean sortSegments, CompressionCodec codec,
|
||||||
this(conf, fs, segments, comparator, reporter, sortSegments);
|
TaskType taskType) {
|
||||||
|
this(conf, fs, segments, comparator, reporter, sortSegments,
|
||||||
|
taskType);
|
||||||
this.codec = codec;
|
this.codec = codec;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.SortedSet;
|
import java.util.SortedSet;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
@ -74,6 +75,10 @@ public class ReduceTask extends Task {
|
||||||
|
|
||||||
private CompressionCodec codec;
|
private CompressionCodec codec;
|
||||||
|
|
||||||
|
// If this is a LocalJobRunner-based job, this will
|
||||||
|
// be a mapping from map task attempts to their output files.
|
||||||
|
// This will be null in other cases.
|
||||||
|
private Map<TaskAttemptID, MapOutputFile> localMapFiles;
|
||||||
|
|
||||||
{
|
{
|
||||||
getProgress().setStatus("reduce");
|
getProgress().setStatus("reduce");
|
||||||
|
@ -105,23 +110,23 @@ public class ReduceTask extends Task {
|
||||||
// file paths, the first parameter is considered smaller than the second one.
|
// file paths, the first parameter is considered smaller than the second one.
|
||||||
// In case of files with same size and path are considered equal.
|
// In case of files with same size and path are considered equal.
|
||||||
private Comparator<FileStatus> mapOutputFileComparator =
|
private Comparator<FileStatus> mapOutputFileComparator =
|
||||||
new Comparator<FileStatus>() {
|
new Comparator<FileStatus>() {
|
||||||
public int compare(FileStatus a, FileStatus b) {
|
public int compare(FileStatus a, FileStatus b) {
|
||||||
if (a.getLen() < b.getLen())
|
if (a.getLen() < b.getLen())
|
||||||
return -1;
|
return -1;
|
||||||
else if (a.getLen() == b.getLen())
|
else if (a.getLen() == b.getLen())
|
||||||
if (a.getPath().toString().equals(b.getPath().toString()))
|
if (a.getPath().toString().equals(b.getPath().toString()))
|
||||||
return 0;
|
return 0;
|
||||||
else
|
|
||||||
return -1;
|
|
||||||
else
|
else
|
||||||
return 1;
|
return -1;
|
||||||
}
|
else
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// A sorted set for keeping a set of map output files on disk
|
// A sorted set for keeping a set of map output files on disk
|
||||||
private final SortedSet<FileStatus> mapOutputFilesOnDisk =
|
private final SortedSet<FileStatus> mapOutputFilesOnDisk =
|
||||||
new TreeSet<FileStatus>(mapOutputFileComparator);
|
new TreeSet<FileStatus>(mapOutputFileComparator);
|
||||||
|
|
||||||
public ReduceTask() {
|
public ReduceTask() {
|
||||||
super();
|
super();
|
||||||
|
@ -133,6 +138,17 @@ public ReduceTask(String jobFile, TaskAttemptID taskId,
|
||||||
this.numMaps = numMaps;
|
this.numMaps = numMaps;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register the set of mapper outputs created by a LocalJobRunner-based
|
||||||
|
* job with this ReduceTask so it knows where to fetch from.
|
||||||
|
*
|
||||||
|
* This should not be called in normal (networked) execution.
|
||||||
|
*/
|
||||||
|
public void setLocalMapFiles(Map<TaskAttemptID, MapOutputFile> mapFiles) {
|
||||||
|
this.localMapFiles = mapFiles;
|
||||||
|
}
|
||||||
|
|
||||||
private CompressionCodec initCodec() {
|
private CompressionCodec initCodec() {
|
||||||
// check if map-outputs are to be compressed
|
// check if map-outputs are to be compressed
|
||||||
if (conf.getCompressMapOutput()) {
|
if (conf.getCompressMapOutput()) {
|
||||||
|
@ -174,20 +190,11 @@ public void readFields(DataInput in) throws IOException {
|
||||||
numMaps = in.readInt();
|
numMaps = in.readInt();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the input files for the reducer.
|
// Get the input files for the reducer (for local jobs).
|
||||||
private Path[] getMapFiles(FileSystem fs, boolean isLocal)
|
private Path[] getMapFiles(FileSystem fs) throws IOException {
|
||||||
throws IOException {
|
|
||||||
List<Path> fileList = new ArrayList<Path>();
|
List<Path> fileList = new ArrayList<Path>();
|
||||||
if (isLocal) {
|
for(int i = 0; i < numMaps; ++i) {
|
||||||
// for local jobs
|
fileList.add(mapOutputFile.getInputFile(i));
|
||||||
for(int i = 0; i < numMaps; ++i) {
|
|
||||||
fileList.add(mapOutputFile.getInputFile(i));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// for non local jobs
|
|
||||||
for (FileStatus filestatus : mapOutputFilesOnDisk) {
|
|
||||||
fileList.add(filestatus.getPath());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return fileList.toArray(new Path[0]);
|
return fileList.toArray(new Path[0]);
|
||||||
}
|
}
|
||||||
|
@ -343,54 +350,31 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
|
||||||
RawKeyValueIterator rIter = null;
|
RawKeyValueIterator rIter = null;
|
||||||
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
|
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
|
||||||
|
|
||||||
boolean isLocal = false;
|
Class combinerClass = conf.getCombinerClass();
|
||||||
// local if
|
CombineOutputCollector combineCollector =
|
||||||
// 1) framework == local or
|
(null != combinerClass) ?
|
||||||
// 2) framework == null and job tracker address == local
|
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
|
||||||
String framework = job.get(MRConfig.FRAMEWORK_NAME);
|
|
||||||
String masterAddr = job.get(MRConfig.MASTER_ADDRESS, "local");
|
|
||||||
if ((framework == null && masterAddr.equals("local"))
|
|
||||||
|| (framework != null && framework.equals(MRConfig.LOCAL_FRAMEWORK_NAME))) {
|
|
||||||
isLocal = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!isLocal) {
|
Class<? extends ShuffleConsumerPlugin> clazz =
|
||||||
Class combinerClass = conf.getCombinerClass();
|
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
|
||||||
CombineOutputCollector combineCollector =
|
|
||||||
(null != combinerClass) ?
|
|
||||||
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
|
|
||||||
|
|
||||||
Class<? extends ShuffleConsumerPlugin> clazz =
|
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
|
||||||
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
|
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
|
||||||
|
|
||||||
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
|
ShuffleConsumerPlugin.Context shuffleContext =
|
||||||
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
|
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
|
||||||
|
super.lDirAlloc, reporter, codec,
|
||||||
|
combinerClass, combineCollector,
|
||||||
|
spilledRecordsCounter, reduceCombineInputCounter,
|
||||||
|
shuffledMapsCounter,
|
||||||
|
reduceShuffleBytes, failedShuffleCounter,
|
||||||
|
mergedMapOutputsCounter,
|
||||||
|
taskStatus, copyPhase, sortPhase, this,
|
||||||
|
mapOutputFile, localMapFiles);
|
||||||
|
shuffleConsumerPlugin.init(shuffleContext);
|
||||||
|
|
||||||
|
rIter = shuffleConsumerPlugin.run();
|
||||||
|
|
||||||
ShuffleConsumerPlugin.Context shuffleContext =
|
|
||||||
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
|
|
||||||
super.lDirAlloc, reporter, codec,
|
|
||||||
combinerClass, combineCollector,
|
|
||||||
spilledRecordsCounter, reduceCombineInputCounter,
|
|
||||||
shuffledMapsCounter,
|
|
||||||
reduceShuffleBytes, failedShuffleCounter,
|
|
||||||
mergedMapOutputsCounter,
|
|
||||||
taskStatus, copyPhase, sortPhase, this,
|
|
||||||
mapOutputFile);
|
|
||||||
shuffleConsumerPlugin.init(shuffleContext);
|
|
||||||
rIter = shuffleConsumerPlugin.run();
|
|
||||||
} else {
|
|
||||||
// local job runner doesn't have a copy phase
|
|
||||||
copyPhase.complete();
|
|
||||||
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
|
|
||||||
rIter = Merger.merge(job, rfs, job.getMapOutputKeyClass(),
|
|
||||||
job.getMapOutputValueClass(), codec,
|
|
||||||
getMapFiles(rfs, true),
|
|
||||||
!conf.getKeepFailedTaskFiles(),
|
|
||||||
job.getInt(JobContext.IO_SORT_FACTOR, 100),
|
|
||||||
new Path(getTaskID().toString()),
|
|
||||||
job.getOutputKeyComparator(),
|
|
||||||
reporter, spilledRecordsCounter, null, null);
|
|
||||||
}
|
|
||||||
// free up the data structures
|
// free up the data structures
|
||||||
mapOutputFilesOnDisk.clear();
|
mapOutputFilesOnDisk.clear();
|
||||||
|
|
||||||
|
@ -409,9 +393,7 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
|
||||||
keyClass, valueClass);
|
keyClass, valueClass);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (shuffleConsumerPlugin != null) {
|
shuffleConsumerPlugin.close();
|
||||||
shuffleConsumerPlugin.close();
|
|
||||||
}
|
|
||||||
done(umbilical, reporter);
|
done(umbilical, reporter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
|
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||||
|
@ -65,6 +67,7 @@ public static class Context<K,V> {
|
||||||
private final Progress mergePhase;
|
private final Progress mergePhase;
|
||||||
private final Task reduceTask;
|
private final Task reduceTask;
|
||||||
private final MapOutputFile mapOutputFile;
|
private final MapOutputFile mapOutputFile;
|
||||||
|
private final Map<TaskAttemptID, MapOutputFile> localMapFiles;
|
||||||
|
|
||||||
public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId,
|
public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId,
|
||||||
JobConf jobConf, FileSystem localFS,
|
JobConf jobConf, FileSystem localFS,
|
||||||
|
@ -80,7 +83,8 @@ public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId,
|
||||||
Counters.Counter failedShuffleCounter,
|
Counters.Counter failedShuffleCounter,
|
||||||
Counters.Counter mergedMapOutputsCounter,
|
Counters.Counter mergedMapOutputsCounter,
|
||||||
TaskStatus status, Progress copyPhase, Progress mergePhase,
|
TaskStatus status, Progress copyPhase, Progress mergePhase,
|
||||||
Task reduceTask, MapOutputFile mapOutputFile) {
|
Task reduceTask, MapOutputFile mapOutputFile,
|
||||||
|
Map<TaskAttemptID, MapOutputFile> localMapFiles) {
|
||||||
this.reduceId = reduceId;
|
this.reduceId = reduceId;
|
||||||
this.jobConf = jobConf;
|
this.jobConf = jobConf;
|
||||||
this.localFS = localFS;
|
this.localFS = localFS;
|
||||||
|
@ -101,6 +105,7 @@ public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId,
|
||||||
this.mergePhase = mergePhase;
|
this.mergePhase = mergePhase;
|
||||||
this.reduceTask = reduceTask;
|
this.reduceTask = reduceTask;
|
||||||
this.mapOutputFile = mapOutputFile;
|
this.mapOutputFile = mapOutputFile;
|
||||||
|
this.localMapFiles = localMapFiles;
|
||||||
}
|
}
|
||||||
|
|
||||||
public org.apache.hadoop.mapreduce.TaskAttemptID getReduceId() {
|
public org.apache.hadoop.mapreduce.TaskAttemptID getReduceId() {
|
||||||
|
@ -163,6 +168,9 @@ public Task getReduceTask() {
|
||||||
public MapOutputFile getMapOutputFile() {
|
public MapOutputFile getMapOutputFile() {
|
||||||
return mapOutputFile;
|
return mapOutputFile;
|
||||||
}
|
}
|
||||||
|
public Map<TaskAttemptID, MapOutputFile> getLocalMapFiles() {
|
||||||
|
return localMapFiles;
|
||||||
|
}
|
||||||
} // end of public static class Context<K,V>
|
} // end of public static class Context<K,V>
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,7 +60,7 @@ class Fetcher<K,V> extends Thread {
|
||||||
/* Default read timeout (in milliseconds) */
|
/* Default read timeout (in milliseconds) */
|
||||||
private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
|
private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
|
||||||
|
|
||||||
private final Reporter reporter;
|
protected final Reporter reporter;
|
||||||
private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
|
private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
|
||||||
CONNECTION, WRONG_REDUCE}
|
CONNECTION, WRONG_REDUCE}
|
||||||
|
|
||||||
|
@ -71,13 +71,13 @@ private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
|
||||||
private final Counters.Counter badIdErrs;
|
private final Counters.Counter badIdErrs;
|
||||||
private final Counters.Counter wrongMapErrs;
|
private final Counters.Counter wrongMapErrs;
|
||||||
private final Counters.Counter wrongReduceErrs;
|
private final Counters.Counter wrongReduceErrs;
|
||||||
private final MergeManager<K,V> merger;
|
protected final MergeManager<K,V> merger;
|
||||||
private final ShuffleSchedulerImpl<K,V> scheduler;
|
protected final ShuffleSchedulerImpl<K,V> scheduler;
|
||||||
private final ShuffleClientMetrics metrics;
|
protected final ShuffleClientMetrics metrics;
|
||||||
private final ExceptionReporter exceptionReporter;
|
protected final ExceptionReporter exceptionReporter;
|
||||||
private final int id;
|
protected final int id;
|
||||||
private static int nextId = 0;
|
private static int nextId = 0;
|
||||||
private final int reduce;
|
protected final int reduce;
|
||||||
|
|
||||||
private final int connectionTimeout;
|
private final int connectionTimeout;
|
||||||
private final int readTimeout;
|
private final int readTimeout;
|
||||||
|
|
|
@ -0,0 +1,166 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce.task.reduce;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import javax.crypto.SecretKey;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.mapred.IndexRecord;
|
||||||
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
import org.apache.hadoop.mapred.MapOutputFile;
|
||||||
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
|
import org.apache.hadoop.mapred.SpillRecord;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* LocalFetcher is used by LocalJobRunner to perform a local filesystem
|
||||||
|
* fetch.
|
||||||
|
*/
|
||||||
|
class LocalFetcher<K,V> extends Fetcher<K, V> {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(LocalFetcher.class);
|
||||||
|
|
||||||
|
private static final MapHost LOCALHOST = new MapHost("local", "local");
|
||||||
|
|
||||||
|
private JobConf job;
|
||||||
|
private Map<TaskAttemptID, MapOutputFile> localMapFiles;
|
||||||
|
|
||||||
|
public LocalFetcher(JobConf job, TaskAttemptID reduceId,
|
||||||
|
ShuffleSchedulerImpl<K, V> scheduler,
|
||||||
|
MergeManager<K,V> merger,
|
||||||
|
Reporter reporter, ShuffleClientMetrics metrics,
|
||||||
|
ExceptionReporter exceptionReporter,
|
||||||
|
SecretKey shuffleKey,
|
||||||
|
Map<TaskAttemptID, MapOutputFile> localMapFiles) {
|
||||||
|
super(job, reduceId, scheduler, merger, reporter, metrics,
|
||||||
|
exceptionReporter, shuffleKey);
|
||||||
|
|
||||||
|
this.job = job;
|
||||||
|
this.localMapFiles = localMapFiles;
|
||||||
|
|
||||||
|
setName("localfetcher#" + id);
|
||||||
|
setDaemon(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
// Create a worklist of task attempts to work over.
|
||||||
|
Set<TaskAttemptID> maps = new HashSet<TaskAttemptID>();
|
||||||
|
for (TaskAttemptID map : localMapFiles.keySet()) {
|
||||||
|
maps.add(map);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (maps.size() > 0) {
|
||||||
|
try {
|
||||||
|
// If merge is on, block
|
||||||
|
merger.waitForResource();
|
||||||
|
metrics.threadBusy();
|
||||||
|
|
||||||
|
// Copy as much as is possible.
|
||||||
|
doCopy(maps);
|
||||||
|
metrics.threadFree();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
} catch (Throwable t) {
|
||||||
|
exceptionReporter.reportException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The crux of the matter...
|
||||||
|
*/
|
||||||
|
private void doCopy(Set<TaskAttemptID> maps) throws IOException {
|
||||||
|
Iterator<TaskAttemptID> iter = maps.iterator();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
TaskAttemptID map = iter.next();
|
||||||
|
LOG.debug("LocalFetcher " + id + " going to fetch: " + map);
|
||||||
|
if (copyMapOutput(map)) {
|
||||||
|
// Successful copy. Remove this from our worklist.
|
||||||
|
iter.remove();
|
||||||
|
} else {
|
||||||
|
// We got back a WAIT command; go back to the outer loop
|
||||||
|
// and block for InMemoryMerge.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve the map output of a single map task
|
||||||
|
* and send it to the merger.
|
||||||
|
*/
|
||||||
|
private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
|
||||||
|
// Figure out where the map task stored its output.
|
||||||
|
Path mapOutputFileName = localMapFiles.get(mapTaskId).getOutputFile();
|
||||||
|
Path indexFileName = mapOutputFileName.suffix(".index");
|
||||||
|
|
||||||
|
// Read its index to determine the location of our split
|
||||||
|
// and its size.
|
||||||
|
SpillRecord sr = new SpillRecord(indexFileName, job);
|
||||||
|
IndexRecord ir = sr.getIndex(reduce);
|
||||||
|
|
||||||
|
long compressedLength = ir.partLength;
|
||||||
|
long decompressedLength = ir.rawLength;
|
||||||
|
|
||||||
|
// Get the location for the map output - either in-memory or on-disk
|
||||||
|
MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength,
|
||||||
|
id);
|
||||||
|
|
||||||
|
// Check if we can shuffle *now* ...
|
||||||
|
if (mapOutput == null) {
|
||||||
|
LOG.info("fetcher#" + id + " - MergeManager returned Status.WAIT ...");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Go!
|
||||||
|
LOG.info("localfetcher#" + id + " about to shuffle output of map " +
|
||||||
|
mapOutput.getMapId() + " decomp: " +
|
||||||
|
decompressedLength + " len: " + compressedLength + " to " +
|
||||||
|
mapOutput.getDescription());
|
||||||
|
|
||||||
|
// now read the file, seek to the appropriate section, and send it.
|
||||||
|
FileSystem localFs = FileSystem.getLocal(job).getRaw();
|
||||||
|
FSDataInputStream inStream = localFs.open(mapOutputFileName);
|
||||||
|
try {
|
||||||
|
inStream.seek(ir.startOffset);
|
||||||
|
|
||||||
|
mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter);
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
inStream.close();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.warn("IOException closing inputstream from map output: "
|
||||||
|
+ ioe.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0,
|
||||||
|
mapOutput);
|
||||||
|
return true; // successful fetch.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -18,10 +18,12 @@
|
||||||
package org.apache.hadoop.mapreduce.task.reduce;
|
package org.apache.hadoop.mapreduce.task.reduce;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
import org.apache.hadoop.mapred.MapOutputFile;
|
||||||
import org.apache.hadoop.mapred.RawKeyValueIterator;
|
import org.apache.hadoop.mapred.RawKeyValueIterator;
|
||||||
import org.apache.hadoop.mapred.Reporter;
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
import org.apache.hadoop.mapred.Task;
|
import org.apache.hadoop.mapred.Task;
|
||||||
|
@ -56,6 +58,7 @@ public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionRepo
|
||||||
private Progress copyPhase;
|
private Progress copyPhase;
|
||||||
private TaskStatus taskStatus;
|
private TaskStatus taskStatus;
|
||||||
private Task reduceTask; //Used for status updates
|
private Task reduceTask; //Used for status updates
|
||||||
|
private Map<TaskAttemptID, MapOutputFile> localMapFiles;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(ShuffleConsumerPlugin.Context context) {
|
public void init(ShuffleConsumerPlugin.Context context) {
|
||||||
|
@ -69,6 +72,7 @@ public void init(ShuffleConsumerPlugin.Context context) {
|
||||||
this.copyPhase = context.getCopyPhase();
|
this.copyPhase = context.getCopyPhase();
|
||||||
this.taskStatus = context.getStatus();
|
this.taskStatus = context.getStatus();
|
||||||
this.reduceTask = context.getReduceTask();
|
this.reduceTask = context.getReduceTask();
|
||||||
|
this.localMapFiles = context.getLocalMapFiles();
|
||||||
|
|
||||||
scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
|
scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
|
||||||
this, copyPhase, context.getShuffledMapsCounter(),
|
this, copyPhase, context.getShuffledMapsCounter(),
|
||||||
|
@ -103,13 +107,22 @@ public RawKeyValueIterator run() throws IOException, InterruptedException {
|
||||||
eventFetcher.start();
|
eventFetcher.start();
|
||||||
|
|
||||||
// Start the map-output fetcher threads
|
// Start the map-output fetcher threads
|
||||||
final int numFetchers = jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
|
boolean isLocal = localMapFiles != null;
|
||||||
|
final int numFetchers = isLocal ? 1 :
|
||||||
|
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
|
||||||
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
|
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
|
||||||
for (int i=0; i < numFetchers; ++i) {
|
if (isLocal) {
|
||||||
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
|
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
|
||||||
reporter, metrics, this,
|
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
|
||||||
reduceTask.getShuffleSecret());
|
localMapFiles);
|
||||||
fetchers[i].start();
|
fetchers[0].start();
|
||||||
|
} else {
|
||||||
|
for (int i=0; i < numFetchers; ++i) {
|
||||||
|
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
|
||||||
|
reporter, metrics, this,
|
||||||
|
reduceTask.getShuffleSecret());
|
||||||
|
fetchers[i].start();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for shuffle to complete successfully
|
// Wait for shuffle to complete successfully
|
||||||
|
|
|
@ -155,7 +155,7 @@ public void testConsumerApi() {
|
||||||
mockCounter, mockCounter, mockCounter,
|
mockCounter, mockCounter, mockCounter,
|
||||||
mockCounter, mockCounter, mockCounter,
|
mockCounter, mockCounter, mockCounter,
|
||||||
mockTaskStatus, mockProgress, mockProgress,
|
mockTaskStatus, mockProgress, mockProgress,
|
||||||
mockTask, mockMapOutputFile);
|
mockTask, mockMapOutputFile, null);
|
||||||
shuffleConsumerPlugin.init(context);
|
shuffleConsumerPlugin.init(context);
|
||||||
shuffleConsumerPlugin.run();
|
shuffleConsumerPlugin.run();
|
||||||
shuffleConsumerPlugin.close();
|
shuffleConsumerPlugin.close();
|
||||||
|
|
|
@ -276,18 +276,16 @@ public void testOldCounterA() throws Exception {
|
||||||
// there are too few spills to combine (2 < 3)
|
// there are too few spills to combine (2 < 3)
|
||||||
// Each map spills 2^14 records, so maps spill 49152 records, combined.
|
// Each map spills 2^14 records, so maps spill 49152 records, combined.
|
||||||
|
|
||||||
// The reduce spill count is composed of the read from one segment and
|
// The combiner has emitted 24576 records to the reducer; these are all
|
||||||
// the intermediate merge of the other two. The intermediate merge
|
// fetched straight to memory from the map side. The intermediate merge
|
||||||
// adds 8192 records per segment read; again, there are too few spills to
|
// adds 8192 records per segment read; again, there are too few spills to
|
||||||
// combine, so all 16834 are written to disk (total 32768 spilled records
|
// combine, so all Total spilled records in the reduce
|
||||||
// for the intermediate merge). The merge into the reduce includes only
|
// is 8192 records / map * 3 maps = 24576.
|
||||||
// the unmerged segment, size 8192. Total spilled records in the reduce
|
|
||||||
// is 32768 from the merge + 8192 unmerged segment = 40960 records
|
|
||||||
|
|
||||||
// Total: map + reduce = 49152 + 40960 = 90112
|
// Total: map + reduce = 49152 + 24576 = 73728
|
||||||
// 3 files, 5120 = 5 * 1024 rec/file = 15360 input records
|
// 3 files, 5120 = 5 * 1024 rec/file = 15360 input records
|
||||||
// 4 records/line = 61440 output records
|
// 4 records/line = 61440 output records
|
||||||
validateCounters(c1, 90112, 15360, 61440);
|
validateCounters(c1, 73728, 15360, 61440);
|
||||||
validateFileCounters(c1, inputSize, 0, 0, 0);
|
validateFileCounters(c1, inputSize, 0, 0, 0);
|
||||||
validateOldFileCounters(c1, inputSize, 61928, 0, 0);
|
validateOldFileCounters(c1, inputSize, 61928, 0, 0);
|
||||||
}
|
}
|
||||||
|
@ -316,12 +314,12 @@ public void testOldCounterB() throws Exception {
|
||||||
// 1st merge: read + write = 8192 * 4
|
// 1st merge: read + write = 8192 * 4
|
||||||
// 2nd merge: read + write = 8192 * 4
|
// 2nd merge: read + write = 8192 * 4
|
||||||
// final merge: 0
|
// final merge: 0
|
||||||
// Total reduce: 65536
|
// Total reduce: 32768
|
||||||
|
|
||||||
// Total: map + reduce = 2^16 + 2^16 = 131072
|
// Total: map + reduce = 2^16 + 2^15 = 98304
|
||||||
// 4 files, 5120 = 5 * 1024 rec/file = 15360 input records
|
// 4 files, 5120 = 5 * 1024 rec/file = 15360 input records
|
||||||
// 4 records/line = 81920 output records
|
// 4 records/line = 81920 output records
|
||||||
validateCounters(c1, 131072, 20480, 81920);
|
validateCounters(c1, 98304, 20480, 81920);
|
||||||
validateFileCounters(c1, inputSize, 0, 0, 0);
|
validateFileCounters(c1, inputSize, 0, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -349,7 +347,7 @@ public void testOldCounterC() throws Exception {
|
||||||
// Total reduce: 45056
|
// Total reduce: 45056
|
||||||
// 5 files, 5120 = 5 * 1024 rec/file = 15360 input records
|
// 5 files, 5120 = 5 * 1024 rec/file = 15360 input records
|
||||||
// 4 records/line = 102400 output records
|
// 4 records/line = 102400 output records
|
||||||
validateCounters(c1, 147456, 25600, 102400);
|
validateCounters(c1, 122880, 25600, 102400);
|
||||||
validateFileCounters(c1, inputSize, 0, 0, 0);
|
validateFileCounters(c1, inputSize, 0, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -394,7 +392,7 @@ public void testNewCounterA() throws Exception {
|
||||||
job, new Path(OUT_DIR, "outputN0"));
|
job, new Path(OUT_DIR, "outputN0"));
|
||||||
assertTrue(job.waitForCompletion(true));
|
assertTrue(job.waitForCompletion(true));
|
||||||
final Counters c1 = Counters.downgrade(job.getCounters());
|
final Counters c1 = Counters.downgrade(job.getCounters());
|
||||||
validateCounters(c1, 90112, 15360, 61440);
|
validateCounters(c1, 73728, 15360, 61440);
|
||||||
validateFileCounters(c1, inputSize, 0, 0, 0);
|
validateFileCounters(c1, inputSize, 0, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -416,7 +414,7 @@ public void testNewCounterB() throws Exception {
|
||||||
job, new Path(OUT_DIR, "outputN1"));
|
job, new Path(OUT_DIR, "outputN1"));
|
||||||
assertTrue(job.waitForCompletion(true));
|
assertTrue(job.waitForCompletion(true));
|
||||||
final Counters c1 = Counters.downgrade(job.getCounters());
|
final Counters c1 = Counters.downgrade(job.getCounters());
|
||||||
validateCounters(c1, 131072, 20480, 81920);
|
validateCounters(c1, 98304, 20480, 81920);
|
||||||
validateFileCounters(c1, inputSize, 0, 0, 0);
|
validateFileCounters(c1, inputSize, 0, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -439,7 +437,7 @@ public void testNewCounterC() throws Exception {
|
||||||
job, new Path(OUT_DIR, "outputN2"));
|
job, new Path(OUT_DIR, "outputN2"));
|
||||||
assertTrue(job.waitForCompletion(true));
|
assertTrue(job.waitForCompletion(true));
|
||||||
final Counters c1 = Counters.downgrade(job.getCounters());
|
final Counters c1 = Counters.downgrade(job.getCounters());
|
||||||
validateCounters(c1, 147456, 25600, 102400);
|
validateCounters(c1, 122880, 25600, 102400);
|
||||||
validateFileCounters(c1, inputSize, 0, 0, 0);
|
validateFileCounters(c1, inputSize, 0, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -63,7 +63,7 @@ public void configure(String keySpec, int expect) throws Exception {
|
||||||
conf.setOutputValueClass(LongWritable.class);
|
conf.setOutputValueClass(LongWritable.class);
|
||||||
|
|
||||||
conf.setNumMapTasks(1);
|
conf.setNumMapTasks(1);
|
||||||
conf.setNumReduceTasks(2);
|
conf.setNumReduceTasks(1);
|
||||||
|
|
||||||
conf.setOutputFormat(TextOutputFormat.class);
|
conf.setOutputFormat(TextOutputFormat.class);
|
||||||
conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
|
conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
|
||||||
|
@ -101,9 +101,7 @@ public void configure(String keySpec, int expect) throws Exception {
|
||||||
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
|
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
|
||||||
String line = reader.readLine();
|
String line = reader.readLine();
|
||||||
//make sure we get what we expect as the first line, and also
|
//make sure we get what we expect as the first line, and also
|
||||||
//that we have two lines (both the lines must end up in the same
|
//that we have two lines
|
||||||
//reducer since the partitioner takes the same key spec for all
|
|
||||||
//lines
|
|
||||||
if (expect == 1) {
|
if (expect == 1) {
|
||||||
assertTrue(line.startsWith(line1));
|
assertTrue(line.startsWith(line1));
|
||||||
} else if (expect == 2) {
|
} else if (expect == 2) {
|
||||||
|
|
|
@ -31,9 +31,9 @@
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.io.NullWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.*;
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.mapred.LocalJobRunner;
|
import org.apache.hadoop.mapred.LocalJobRunner;
|
||||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||||
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
|
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
|
||||||
|
@ -410,6 +410,7 @@ public boolean nextKeyValue() {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Test case for zero mappers */
|
/** Test case for zero mappers */
|
||||||
|
@Test
|
||||||
public void testEmptyMaps() throws Exception {
|
public void testEmptyMaps() throws Exception {
|
||||||
Job job = Job.getInstance();
|
Job job = Job.getInstance();
|
||||||
Path outputPath = getOutputPath();
|
Path outputPath = getOutputPath();
|
||||||
|
@ -428,5 +429,145 @@ public void testEmptyMaps() throws Exception {
|
||||||
boolean success = job.waitForCompletion(true);
|
boolean success = job.waitForCompletion(true);
|
||||||
assertTrue("Empty job should work", success);
|
assertTrue("Empty job should work", success);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** @return the directory where numberfiles are written (mapper inputs) */
|
||||||
|
private Path getNumberDirPath() {
|
||||||
|
return new Path(getInputPath(), "numberfiles");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write out an input file containing an integer.
|
||||||
|
*
|
||||||
|
* @param fileNum the file number to write to.
|
||||||
|
* @param value the value to write to the file
|
||||||
|
* @return the path of the written file.
|
||||||
|
*/
|
||||||
|
private Path makeNumberFile(int fileNum, int value) throws IOException {
|
||||||
|
Path workDir = getNumberDirPath();
|
||||||
|
Path filePath = new Path(workDir, "file" + fileNum);
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
FileSystem fs = FileSystem.getLocal(conf);
|
||||||
|
|
||||||
|
OutputStream os = fs.create(filePath);
|
||||||
|
BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
|
||||||
|
w.write("" + value);
|
||||||
|
w.close();
|
||||||
|
|
||||||
|
return filePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Each record received by this mapper is a number 'n'.
|
||||||
|
* Emit the values [0..n-1]
|
||||||
|
*/
|
||||||
|
public static class SequenceMapper
|
||||||
|
extends Mapper<LongWritable, Text, Text, NullWritable> {
|
||||||
|
|
||||||
|
public void map(LongWritable k, Text v, Context c)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
int max = Integer.valueOf(v.toString());
|
||||||
|
for (int i = 0; i < max; i++) {
|
||||||
|
c.write(new Text("" + i), NullWritable.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final static int NUMBER_FILE_VAL = 100;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tally up the values and ensure that we got as much data
|
||||||
|
* out as we put in.
|
||||||
|
* Each mapper generated 'NUMBER_FILE_VAL' values (0..NUMBER_FILE_VAL-1).
|
||||||
|
* Verify that across all our reducers we got exactly this much
|
||||||
|
* data back.
|
||||||
|
*/
|
||||||
|
private void verifyNumberJob(int numMaps) throws Exception {
|
||||||
|
Path outputDir = getOutputPath();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
FileSystem fs = FileSystem.getLocal(conf);
|
||||||
|
|
||||||
|
FileStatus [] stats = fs.listStatus(outputDir);
|
||||||
|
int valueSum = 0;
|
||||||
|
for (FileStatus f : stats) {
|
||||||
|
FSDataInputStream istream = fs.open(f.getPath());
|
||||||
|
BufferedReader r = new BufferedReader(new InputStreamReader(istream));
|
||||||
|
String line = null;
|
||||||
|
while ((line = r.readLine()) != null) {
|
||||||
|
valueSum += Integer.valueOf(line.trim());
|
||||||
|
}
|
||||||
|
r.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
int maxVal = NUMBER_FILE_VAL - 1;
|
||||||
|
int expectedPerMapper = maxVal * (maxVal + 1) / 2;
|
||||||
|
int expectedSum = expectedPerMapper * numMaps;
|
||||||
|
LOG.info("expected sum: " + expectedSum + ", got " + valueSum);
|
||||||
|
assertEquals("Didn't get all our results back", expectedSum, valueSum);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run a test which creates a SequenceMapper / IdentityReducer
|
||||||
|
* job over a set of generated number files.
|
||||||
|
*/
|
||||||
|
private void doMultiReducerTest(int numMaps, int numReduces,
|
||||||
|
int parallelMaps, int parallelReduces) throws Exception {
|
||||||
|
|
||||||
|
Path in = getNumberDirPath();
|
||||||
|
Path out = getOutputPath();
|
||||||
|
|
||||||
|
// Clear data from any previous tests.
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
FileSystem fs = FileSystem.getLocal(conf);
|
||||||
|
if (fs.exists(out)) {
|
||||||
|
fs.delete(out, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fs.exists(in)) {
|
||||||
|
fs.delete(in, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < numMaps; i++) {
|
||||||
|
makeNumberFile(i, 100);
|
||||||
|
}
|
||||||
|
|
||||||
|
Job job = Job.getInstance();
|
||||||
|
job.setNumReduceTasks(numReduces);
|
||||||
|
|
||||||
|
job.setMapperClass(SequenceMapper.class);
|
||||||
|
job.setOutputKeyClass(Text.class);
|
||||||
|
job.setOutputValueClass(NullWritable.class);
|
||||||
|
FileInputFormat.addInputPath(job, in);
|
||||||
|
FileOutputFormat.setOutputPath(job, out);
|
||||||
|
|
||||||
|
LocalJobRunner.setLocalMaxRunningMaps(job, parallelMaps);
|
||||||
|
LocalJobRunner.setLocalMaxRunningReduces(job, parallelReduces);
|
||||||
|
|
||||||
|
boolean result = job.waitForCompletion(true);
|
||||||
|
assertTrue("Job failed!!", result);
|
||||||
|
|
||||||
|
verifyNumberJob(numMaps);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOneMapMultiReduce() throws Exception {
|
||||||
|
doMultiReducerTest(1, 2, 1, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOneMapMultiParallelReduce() throws Exception {
|
||||||
|
doMultiReducerTest(1, 2, 1, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiMapOneReduce() throws Exception {
|
||||||
|
doMultiReducerTest(4, 1, 2, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiMapMultiReduce() throws Exception {
|
||||||
|
doMultiReducerTest(4, 4, 2, 2);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -56,7 +56,7 @@ private void testComparator(String keySpec, int expect)
|
||||||
conf.set("mapreduce.partition.keypartitioner.options", "-k1.1,1.1");
|
conf.set("mapreduce.partition.keypartitioner.options", "-k1.1,1.1");
|
||||||
conf.set(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " ");
|
conf.set(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " ");
|
||||||
|
|
||||||
Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 2,
|
Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1,
|
||||||
line1 +"\n" + line2 + "\n");
|
line1 +"\n" + line2 + "\n");
|
||||||
job.setMapperClass(InverseMapper.class);
|
job.setMapperClass(InverseMapper.class);
|
||||||
job.setReducerClass(Reducer.class);
|
job.setReducerClass(Reducer.class);
|
||||||
|
|
Loading…
Reference in New Issue