diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 11ddd455261..3a7ff9274e5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -157,6 +157,9 @@ Release 2.3.0 - UNRELEASED IMPROVEMENTS + MAPREDUCE-434. LocalJobRunner limited to single reducer (Sandy Ryza and + Aaron Kimball via Sandy Ryza) + OPTIMIZATIONS MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java index c5e5323089d..95c272d3d25 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -79,11 +79,15 @@ public class LocalJobRunner implements ClientProtocol { public static final String LOCAL_MAX_MAPS = "mapreduce.local.map.tasks.maximum"; + /** The maximum number of reduce tasks to run in parallel in LocalJobRunner */ + public static final String LOCAL_MAX_REDUCES = + "mapreduce.local.reduce.tasks.maximum"; + private FileSystem fs; private HashMap jobs = new HashMap(); private JobConf conf; private AtomicInteger map_tasks = new AtomicInteger(0); - private int reduce_tasks = 0; + private AtomicInteger reduce_tasks = new AtomicInteger(0); final Random rand = new Random(); private LocalJobRunnerMetrics myMetrics = null; @@ -115,9 +119,11 @@ private class Job extends Thread implements TaskUmbilicalProtocol { private JobConf job; private int numMapTasks; + private int numReduceTasks; private float [] partialMapProgress; + private float [] partialReduceProgress; private Counters [] mapCounters; - private Counters reduceCounters; + private Counters [] reduceCounters; private JobStatus status; private List mapIds = Collections.synchronizedList( @@ -184,10 +190,14 @@ public Job(JobID jobid, String jobSubmitDir) throws IOException { this.start(); } + protected abstract class RunnableWithThrowable implements Runnable { + public volatile Throwable storedException; + } + /** * A Runnable instance that handles a map task to be run by an executor. */ - protected class MapTaskRunnable implements Runnable { + protected class MapTaskRunnable extends RunnableWithThrowable { private final int taskId; private final TaskSplitMetaInfo info; private final JobID jobId; @@ -198,8 +208,6 @@ protected class MapTaskRunnable implements Runnable { // where to fetch mapper outputs. private final Map mapOutputFiles; - public volatile Throwable storedException; - public MapTaskRunnable(TaskSplitMetaInfo info, int taskId, JobID jobId, Map mapOutputFiles) { this.info = info; @@ -253,12 +261,13 @@ public void run() { * @param mapOutputFiles a mapping from task attempts to output files * @return a List of Runnables, one per map task. */ - protected List getMapTaskRunnables( + protected List getMapTaskRunnables( TaskSplitMetaInfo [] taskInfo, JobID jobId, Map mapOutputFiles) { int numTasks = 0; - ArrayList list = new ArrayList(); + ArrayList list = + new ArrayList(); for (TaskSplitMetaInfo task : taskInfo) { list.add(new MapTaskRunnable(task, numTasks++, jobId, mapOutputFiles)); @@ -267,12 +276,89 @@ protected List getMapTaskRunnables( return list; } + protected class ReduceTaskRunnable extends RunnableWithThrowable { + private final int taskId; + private final JobID jobId; + private final JobConf localConf; + + // This is a reference to a shared object passed in by the + // external context; this delivers state to the reducers regarding + // where to fetch mapper outputs. + private final Map mapOutputFiles; + + public ReduceTaskRunnable(int taskId, JobID jobId, + Map mapOutputFiles) { + this.taskId = taskId; + this.jobId = jobId; + this.mapOutputFiles = mapOutputFiles; + this.localConf = new JobConf(job); + this.localConf.set("mapreduce.jobtracker.address", "local"); + } + + public void run() { + try { + TaskAttemptID reduceId = new TaskAttemptID(new TaskID( + jobId, TaskType.REDUCE, taskId), 0); + LOG.info("Starting task: " + reduceId); + + ReduceTask reduce = new ReduceTask(systemJobFile.toString(), + reduceId, taskId, mapIds.size(), 1); + reduce.setUser(UserGroupInformation.getCurrentUser(). + getShortUserName()); + setupChildMapredLocalDirs(localJobDir, reduce, localConf); + reduce.setLocalMapFiles(mapOutputFiles); + + if (!Job.this.isInterrupted()) { + reduce.setJobFile(localJobFile.toString()); + localConf.setUser(reduce.getUser()); + reduce.localizeConfiguration(localConf); + reduce.setConf(localConf); + try { + reduce_tasks.getAndIncrement(); + myMetrics.launchReduce(reduce.getTaskID()); + reduce.run(localConf, Job.this); + myMetrics.completeReduce(reduce.getTaskID()); + } finally { + reduce_tasks.getAndDecrement(); + } + + LOG.info("Finishing task: " + reduceId); + } else { + throw new InterruptedException(); + } + } catch (Throwable t) { + // store this to be rethrown in the initial thread context. + this.storedException = t; + } + } + } + + /** + * Create Runnables to encapsulate reduce tasks for use by the executor + * service. + * @param jobId the job id + * @param mapOutputFiles a mapping from task attempts to output files + * @return a List of Runnables, one per reduce task. + */ + protected List getReduceTaskRunnables( + JobID jobId, Map mapOutputFiles) { + + int taskId = 0; + ArrayList list = + new ArrayList(); + for (int i = 0; i < this.numReduceTasks; i++) { + list.add(new ReduceTaskRunnable(taskId++, jobId, mapOutputFiles)); + } + + return list; + } + /** * Initialize the counters that will hold partial-progress from * the various task attempts. * @param numMaps the number of map tasks in this job. */ - private synchronized void initCounters(int numMaps) { + private synchronized void initCounters(int numMaps, int numReduces) { // Initialize state trackers for all map tasks. this.partialMapProgress = new float[numMaps]; this.mapCounters = new Counters[numMaps]; @@ -280,16 +366,22 @@ private synchronized void initCounters(int numMaps) { this.mapCounters[i] = new Counters(); } - this.reduceCounters = new Counters(); + this.partialReduceProgress = new float[numReduces]; + this.reduceCounters = new Counters[numReduces]; + for (int i = 0; i < numReduces; i++) { + this.reduceCounters[i] = new Counters(); + } + + this.numMapTasks = numMaps; + this.numReduceTasks = numReduces; } /** * Creates the executor service used to run map tasks. * - * @param numMapTasks the total number of map tasks to be run * @return an ExecutorService instance that handles map tasks */ - protected ExecutorService createMapExecutor(int numMapTasks) { + protected synchronized ExecutorService createMapExecutor() { // Determine the size of the thread pool to use int maxMapThreads = job.getInt(LOCAL_MAX_MAPS, 1); @@ -297,13 +389,10 @@ protected ExecutorService createMapExecutor(int numMapTasks) { throw new IllegalArgumentException( "Configured " + LOCAL_MAX_MAPS + " must be >= 1"); } - this.numMapTasks = numMapTasks; maxMapThreads = Math.min(maxMapThreads, this.numMapTasks); maxMapThreads = Math.max(maxMapThreads, 1); // In case of no tasks. - initCounters(this.numMapTasks); - - LOG.debug("Starting thread pool executor."); + LOG.debug("Starting mapper thread pool executor."); LOG.debug("Max local threads: " + maxMapThreads); LOG.debug("Map tasks to process: " + this.numMapTasks); @@ -315,6 +404,65 @@ protected ExecutorService createMapExecutor(int numMapTasks) { return executor; } + + /** + * Creates the executor service used to run reduce tasks. + * + * @return an ExecutorService instance that handles reduce tasks + */ + protected synchronized ExecutorService createReduceExecutor() { + + // Determine the size of the thread pool to use + int maxReduceThreads = job.getInt(LOCAL_MAX_REDUCES, 1); + if (maxReduceThreads < 1) { + throw new IllegalArgumentException( + "Configured " + LOCAL_MAX_REDUCES + " must be >= 1"); + } + maxReduceThreads = Math.min(maxReduceThreads, this.numReduceTasks); + maxReduceThreads = Math.max(maxReduceThreads, 1); // In case of no tasks. + + LOG.debug("Starting reduce thread pool executor."); + LOG.debug("Max local threads: " + maxReduceThreads); + LOG.debug("Reduce tasks to process: " + this.numReduceTasks); + + // Create a new executor service to drain the work queue. + ExecutorService executor = Executors.newFixedThreadPool(maxReduceThreads); + + return executor; + } + + /** Run a set of tasks and waits for them to complete. */ + private void runTasks(List runnables, + ExecutorService service, String taskType) throws Exception { + // Start populating the executor with work units. + // They may begin running immediately (in other threads). + for (Runnable r : runnables) { + service.submit(r); + } + + try { + service.shutdown(); // Instructs queue to drain. + + // Wait for tasks to finish; do not use a time-based timeout. + // (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024) + LOG.info("Waiting for " + taskType + " tasks"); + service.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } catch (InterruptedException ie) { + // Cancel all threads. + service.shutdownNow(); + throw ie; + } + + LOG.info(taskType + " task executor complete."); + + // After waiting for the tasks to complete, if any of these + // have thrown an exception, rethrow it now in the main thread context. + for (RunnableWithThrowable r : runnables) { + if (r.storedException != null) { + throw new Exception(r.storedException); + } + } + } private org.apache.hadoop.mapreduce.OutputCommitter createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws Exception { @@ -360,94 +508,25 @@ public void run() { SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir); int numReduceTasks = job.getNumReduceTasks(); - if (numReduceTasks > 1 || numReduceTasks < 0) { - // we only allow 0 or 1 reducer in local mode - numReduceTasks = 1; - job.setNumReduceTasks(1); - } outputCommitter.setupJob(jContext); status.setSetupProgress(1.0f); Map mapOutputFiles = Collections.synchronizedMap(new HashMap()); + + List mapRunnables = getMapTaskRunnables( + taskSplitMetaInfos, jobId, mapOutputFiles); + + initCounters(mapRunnables.size(), numReduceTasks); + ExecutorService mapService = createMapExecutor(); + runTasks(mapRunnables, mapService, "map"); - List taskRunnables = getMapTaskRunnables(taskSplitMetaInfos, - jobId, mapOutputFiles); - ExecutorService mapService = createMapExecutor(taskRunnables.size()); - - // Start populating the executor with work units. - // They may begin running immediately (in other threads). - for (Runnable r : taskRunnables) { - mapService.submit(r); - } - - try { - mapService.shutdown(); // Instructs queue to drain. - - // Wait for tasks to finish; do not use a time-based timeout. - // (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024) - LOG.info("Waiting for map tasks"); - mapService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); - } catch (InterruptedException ie) { - // Cancel all threads. - mapService.shutdownNow(); - throw ie; - } - - LOG.info("Map task executor complete."); - - // After waiting for the map tasks to complete, if any of these - // have thrown an exception, rethrow it now in the main thread context. - for (MapTaskRunnable r : taskRunnables) { - if (r.storedException != null) { - throw new Exception(r.storedException); - } - } - - TaskAttemptID reduceId = - new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0); try { if (numReduceTasks > 0) { - ReduceTask reduce = new ReduceTask(systemJobFile.toString(), - reduceId, 0, mapIds.size(), 1); - reduce.setUser(UserGroupInformation.getCurrentUser(). - getShortUserName()); - JobConf localConf = new JobConf(job); - localConf.set("mapreduce.jobtracker.address", "local"); - setupChildMapredLocalDirs(localJobDir, reduce, localConf); - // move map output to reduce input - for (int i = 0; i < mapIds.size(); i++) { - if (!this.isInterrupted()) { - TaskAttemptID mapId = mapIds.get(i); - Path mapOut = mapOutputFiles.get(mapId).getOutputFile(); - MapOutputFile localOutputFile = new MROutputFiles(); - localOutputFile.setConf(localConf); - Path reduceIn = - localOutputFile.getInputFileForWrite(mapId.getTaskID(), - localFs.getFileStatus(mapOut).getLen()); - if (!localFs.mkdirs(reduceIn.getParent())) { - throw new IOException("Mkdirs failed to create " - + reduceIn.getParent().toString()); - } - if (!localFs.rename(mapOut, reduceIn)) - throw new IOException("Couldn't rename " + mapOut); - } else { - throw new InterruptedException(); - } - } - if (!this.isInterrupted()) { - reduce.setJobFile(localJobFile.toString()); - localConf.setUser(reduce.getUser()); - reduce.localizeConfiguration(localConf); - reduce.setConf(localConf); - reduce_tasks += 1; - myMetrics.launchReduce(reduce.getTaskID()); - reduce.run(localConf, this); - myMetrics.completeReduce(reduce.getTaskID()); - reduce_tasks -= 1; - } else { - throw new InterruptedException(); - } + List reduceRunnables = getReduceTaskRunnables( + jobId, mapOutputFiles); + ExecutorService reduceService = createReduceExecutor(); + runTasks(reduceRunnables, reduceService, "reduce"); } } finally { for (MapOutputFile output : mapOutputFiles.values()) { @@ -465,7 +544,6 @@ public void run() { } JobEndNotifier.localRunnerNotification(job, status); - } catch (Throwable t) { try { outputCommitter.abortJob(jContext, @@ -511,12 +589,13 @@ public synchronized boolean statusUpdate(TaskAttemptID taskId, new ByteArrayInputStream(baos.toByteArray()))); LOG.info(taskStatus.getStateString()); - int taskIndex = mapIds.indexOf(taskId); - if (taskIndex >= 0) { // mapping + int mapTaskIndex = mapIds.indexOf(taskId); + if (mapTaskIndex >= 0) { + // mapping float numTasks = (float) this.numMapTasks; - partialMapProgress[taskIndex] = taskStatus.getProgress(); - mapCounters[taskIndex] = taskStatus.getCounters(); + partialMapProgress[mapTaskIndex] = taskStatus.getProgress(); + mapCounters[mapTaskIndex] = taskStatus.getCounters(); float partialProgress = 0.0f; for (float f : partialMapProgress) { @@ -524,8 +603,18 @@ public synchronized boolean statusUpdate(TaskAttemptID taskId, } status.setMapProgress(partialProgress / numTasks); } else { - reduceCounters = taskStatus.getCounters(); - status.setReduceProgress(taskStatus.getProgress()); + // reducing + int reduceTaskIndex = taskId.getTaskID().getId(); + float numTasks = (float) this.numReduceTasks; + + partialReduceProgress[reduceTaskIndex] = taskStatus.getProgress(); + reduceCounters[reduceTaskIndex] = taskStatus.getCounters(); + + float partialProgress = 0.0f; + for (float f : partialReduceProgress) { + partialProgress += f; + } + status.setReduceProgress(partialProgress / numTasks); } // ignore phase @@ -545,7 +634,13 @@ public synchronized Counters getCurrentCounters() { for (Counters c : mapCounters) { current = Counters.sum(current, c); } - current = Counters.sum(current, reduceCounters); + + if (null != reduceCounters && reduceCounters.length > 0) { + for (Counters c : reduceCounters) { + current = Counters.sum(current, c); + } + } + return current; } @@ -684,8 +779,9 @@ public String getFilesystemName() throws IOException { public ClusterMetrics getClusterMetrics() { int numMapTasks = map_tasks.get(); - return new ClusterMetrics(numMapTasks, reduce_tasks, numMapTasks, - reduce_tasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0); + int numReduceTasks = reduce_tasks.get(); + return new ClusterMetrics(numMapTasks, numReduceTasks, numMapTasks, + numReduceTasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0); } public JobTrackerStatus getJobTrackerStatus() { @@ -816,6 +912,27 @@ public static int getLocalMaxRunningMaps( return job.getConfiguration().getInt(LOCAL_MAX_MAPS, 1); } + + /** + * Set the max number of reduce tasks to run concurrently in the LocalJobRunner. + * @param job the job to configure + * @param maxReduces the maximum number of reduce tasks to allow. + */ + public static void setLocalMaxRunningReduces( + org.apache.hadoop.mapreduce.JobContext job, + int maxReduces) { + job.getConfiguration().setInt(LOCAL_MAX_REDUCES, maxReduces); + } + + /** + * @return the max number of reduce tasks to run concurrently in the + * LocalJobRunner. + */ + public static int getLocalMaxRunningReduces( + org.apache.hadoop.mapreduce.JobContext job) { + return job.getConfiguration().getInt(LOCAL_MAX_REDUCES, 1); + } + @Override public void cancelDelegationToken(Token token ) throws IOException, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java index 84404d1ef13..e9d3ed78863 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java @@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskCounter; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter; import org.apache.hadoop.mapreduce.lib.map.WrappedMapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter; @@ -1860,7 +1861,6 @@ private void mergeParts() throws IOException, InterruptedException, } { sortPhase.addPhases(partitions); // Divide sort phase into sub-phases - Merger.considerFinalMergeForProgress(); IndexRecord rec = new IndexRecord(); final SpillRecord spillRec = new SpillRecord(partitions); @@ -1893,7 +1893,8 @@ private void mergeParts() throws IOException, InterruptedException, segmentList, mergeFactor, new Path(mapId.toString()), job.getOutputKeyComparator(), reporter, sortSegments, - null, spilledRecordsCounter, sortPhase.phase()); + null, spilledRecordsCounter, sortPhase.phase(), + TaskType.MAP); //write merged output to disk long segmentStart = finalOut.getPos(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java index ced9040f413..b4362ac5096 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java @@ -39,6 +39,7 @@ import org.apache.hadoop.mapred.IFile.Reader; import org.apache.hadoop.mapred.IFile.Writer; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.util.PriorityQueue; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progressable; @@ -69,7 +70,8 @@ RawKeyValueIterator merge(Configuration conf, FileSystem fs, throws IOException { return new MergeQueue(conf, fs, inputs, deleteInputs, codec, comparator, - reporter, null).merge(keyClass, valueClass, + reporter, null, + TaskType.REDUCE).merge(keyClass, valueClass, mergeFactor, tmpDir, readsCounter, writesCounter, mergePhase); @@ -90,7 +92,8 @@ RawKeyValueIterator merge(Configuration conf, FileSystem fs, throws IOException { return new MergeQueue(conf, fs, inputs, deleteInputs, codec, comparator, - reporter, mergedMapOutputsCounter).merge( + reporter, mergedMapOutputsCounter, + TaskType.REDUCE).merge( keyClass, valueClass, mergeFactor, tmpDir, readsCounter, writesCounter, @@ -124,7 +127,8 @@ RawKeyValueIterator merge(Configuration conf, FileSystem fs, Progress mergePhase) throws IOException { return new MergeQueue(conf, fs, segments, comparator, reporter, - sortSegments).merge(keyClass, valueClass, + sortSegments, + TaskType.REDUCE).merge(keyClass, valueClass, mergeFactor, tmpDir, readsCounter, writesCounter, mergePhase); @@ -140,10 +144,12 @@ RawKeyValueIterator merge(Configuration conf, FileSystem fs, boolean sortSegments, Counters.Counter readsCounter, Counters.Counter writesCounter, - Progress mergePhase) + Progress mergePhase, + TaskType taskType) throws IOException { return new MergeQueue(conf, fs, segments, comparator, reporter, - sortSegments, codec).merge(keyClass, valueClass, + sortSegments, codec, + taskType).merge(keyClass, valueClass, mergeFactor, tmpDir, readsCounter, writesCounter, mergePhase); @@ -161,7 +167,8 @@ RawKeyValueIterator merge(Configuration conf, FileSystem fs, Progress mergePhase) throws IOException { return new MergeQueue(conf, fs, segments, comparator, reporter, - sortSegments).merge(keyClass, valueClass, + sortSegments, + TaskType.REDUCE).merge(keyClass, valueClass, mergeFactor, inMemSegments, tmpDir, readsCounter, writesCounter, @@ -182,7 +189,8 @@ RawKeyValueIterator merge(Configuration conf, FileSystem fs, Progress mergePhase) throws IOException { return new MergeQueue(conf, fs, segments, comparator, reporter, - sortSegments, codec).merge(keyClass, valueClass, + sortSegments, codec, + TaskType.REDUCE).merge(keyClass, valueClass, mergeFactor, inMemSegments, tmpDir, readsCounter, writesCounter, @@ -366,20 +374,7 @@ void reinitReader(int offset) throws IOException { } } } - - // Boolean variable for including/considering final merge as part of sort - // phase or not. This is true in map task, false in reduce task. It is - // used in calculating mergeProgress. - static boolean includeFinalMerge = false; - - /** - * Sets the boolean variable includeFinalMerge to true. Called from - * map task before calling merge() so that final merge of map task - * is also considered as part of sort phase. - */ - static void considerFinalMergeForProgress() { - includeFinalMerge = true; - } + private static class MergeQueue extends PriorityQueue> implements RawKeyValueIterator { @@ -401,6 +396,21 @@ private static class MergeQueue final DataInputBuffer value = new DataInputBuffer(); final DataInputBuffer diskIFileValue = new DataInputBuffer(); + + // Boolean variable for including/considering final merge as part of sort + // phase or not. This is true in map task, false in reduce task. It is + // used in calculating mergeProgress. + private boolean includeFinalMerge = false; + + /** + * Sets the boolean variable includeFinalMerge to true. Called from + * map task before calling merge() so that final merge of map task + * is also considered as part of sort phase. + */ + private void considerFinalMergeForProgress() { + includeFinalMerge = true; + } + Segment minSegment; Comparator> segmentComparator = new Comparator>() { @@ -419,14 +429,16 @@ public MergeQueue(Configuration conf, FileSystem fs, CompressionCodec codec, RawComparator comparator, Progressable reporter) throws IOException { - this(conf, fs, inputs, deleteInputs, codec, comparator, reporter, null); + this(conf, fs, inputs, deleteInputs, codec, comparator, reporter, null, + TaskType.REDUCE); } public MergeQueue(Configuration conf, FileSystem fs, Path[] inputs, boolean deleteInputs, CompressionCodec codec, RawComparator comparator, Progressable reporter, - Counters.Counter mergedMapOutputsCounter) + Counters.Counter mergedMapOutputsCounter, + TaskType taskType) throws IOException { this.conf = conf; this.fs = fs; @@ -434,6 +446,10 @@ public MergeQueue(Configuration conf, FileSystem fs, this.comparator = comparator; this.reporter = reporter; + if (taskType == TaskType.MAP) { + considerFinalMergeForProgress(); + } + for (Path file : inputs) { LOG.debug("MergeQ: adding: " + file); segments.add(new Segment(conf, fs, file, codec, !deleteInputs, @@ -449,17 +465,20 @@ public MergeQueue(Configuration conf, FileSystem fs, public MergeQueue(Configuration conf, FileSystem fs, List> segments, RawComparator comparator, Progressable reporter) { - this(conf, fs, segments, comparator, reporter, false); + this(conf, fs, segments, comparator, reporter, false, TaskType.REDUCE); } public MergeQueue(Configuration conf, FileSystem fs, List> segments, RawComparator comparator, - Progressable reporter, boolean sortSegments) { + Progressable reporter, boolean sortSegments, TaskType taskType) { this.conf = conf; this.fs = fs; this.comparator = comparator; this.segments = segments; this.reporter = reporter; + if (taskType == TaskType.MAP) { + considerFinalMergeForProgress(); + } if (sortSegments) { Collections.sort(segments, segmentComparator); } @@ -467,8 +486,10 @@ public MergeQueue(Configuration conf, FileSystem fs, public MergeQueue(Configuration conf, FileSystem fs, List> segments, RawComparator comparator, - Progressable reporter, boolean sortSegments, CompressionCodec codec) { - this(conf, fs, segments, comparator, reporter, sortSegments); + Progressable reporter, boolean sortSegments, CompressionCodec codec, + TaskType taskType) { + this(conf, fs, segments, comparator, reporter, sortSegments, + taskType); this.codec = codec; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java index ea8ef3afdca..c1391fa6feb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; @@ -74,6 +75,10 @@ public class ReduceTask extends Task { private CompressionCodec codec; + // If this is a LocalJobRunner-based job, this will + // be a mapping from map task attempts to their output files. + // This will be null in other cases. + private Map localMapFiles; { getProgress().setStatus("reduce"); @@ -105,24 +110,24 @@ public class ReduceTask extends Task { // file paths, the first parameter is considered smaller than the second one. // In case of files with same size and path are considered equal. private Comparator mapOutputFileComparator = - new Comparator() { - public int compare(FileStatus a, FileStatus b) { - if (a.getLen() < b.getLen()) - return -1; - else if (a.getLen() == b.getLen()) - if (a.getPath().toString().equals(b.getPath().toString())) - return 0; - else - return -1; + new Comparator() { + public int compare(FileStatus a, FileStatus b) { + if (a.getLen() < b.getLen()) + return -1; + else if (a.getLen() == b.getLen()) + if (a.getPath().toString().equals(b.getPath().toString())) + return 0; else - return 1; - } + return -1; + else + return 1; + } }; - + // A sorted set for keeping a set of map output files on disk private final SortedSet mapOutputFilesOnDisk = - new TreeSet(mapOutputFileComparator); - + new TreeSet(mapOutputFileComparator); + public ReduceTask() { super(); } @@ -133,6 +138,17 @@ public ReduceTask(String jobFile, TaskAttemptID taskId, this.numMaps = numMaps; } + + /** + * Register the set of mapper outputs created by a LocalJobRunner-based + * job with this ReduceTask so it knows where to fetch from. + * + * This should not be called in normal (networked) execution. + */ + public void setLocalMapFiles(Map mapFiles) { + this.localMapFiles = mapFiles; + } + private CompressionCodec initCodec() { // check if map-outputs are to be compressed if (conf.getCompressMapOutput()) { @@ -174,20 +190,11 @@ public void readFields(DataInput in) throws IOException { numMaps = in.readInt(); } - // Get the input files for the reducer. - private Path[] getMapFiles(FileSystem fs, boolean isLocal) - throws IOException { + // Get the input files for the reducer (for local jobs). + private Path[] getMapFiles(FileSystem fs) throws IOException { List fileList = new ArrayList(); - if (isLocal) { - // for local jobs - for(int i = 0; i < numMaps; ++i) { - fileList.add(mapOutputFile.getInputFile(i)); - } - } else { - // for non local jobs - for (FileStatus filestatus : mapOutputFilesOnDisk) { - fileList.add(filestatus.getPath()); - } + for(int i = 0; i < numMaps; ++i) { + fileList.add(mapOutputFile.getInputFile(i)); } return fileList.toArray(new Path[0]); } @@ -341,56 +348,33 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical) // Initialize the codec codec = initCodec(); RawKeyValueIterator rIter = null; - ShuffleConsumerPlugin shuffleConsumerPlugin = null; + ShuffleConsumerPlugin shuffleConsumerPlugin = null; - boolean isLocal = false; - // local if - // 1) framework == local or - // 2) framework == null and job tracker address == local - String framework = job.get(MRConfig.FRAMEWORK_NAME); - String masterAddr = job.get(MRConfig.MASTER_ADDRESS, "local"); - if ((framework == null && masterAddr.equals("local")) - || (framework != null && framework.equals(MRConfig.LOCAL_FRAMEWORK_NAME))) { - isLocal = true; - } - - if (!isLocal) { - Class combinerClass = conf.getCombinerClass(); - CombineOutputCollector combineCollector = - (null != combinerClass) ? - new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null; + Class combinerClass = conf.getCombinerClass(); + CombineOutputCollector combineCollector = + (null != combinerClass) ? + new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null; - Class clazz = - job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class); - - shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job); - LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin); + Class clazz = + job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class); + + shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job); + LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin); + + ShuffleConsumerPlugin.Context shuffleContext = + new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, + super.lDirAlloc, reporter, codec, + combinerClass, combineCollector, + spilledRecordsCounter, reduceCombineInputCounter, + shuffledMapsCounter, + reduceShuffleBytes, failedShuffleCounter, + mergedMapOutputsCounter, + taskStatus, copyPhase, sortPhase, this, + mapOutputFile, localMapFiles); + shuffleConsumerPlugin.init(shuffleContext); + + rIter = shuffleConsumerPlugin.run(); - ShuffleConsumerPlugin.Context shuffleContext = - new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, - super.lDirAlloc, reporter, codec, - combinerClass, combineCollector, - spilledRecordsCounter, reduceCombineInputCounter, - shuffledMapsCounter, - reduceShuffleBytes, failedShuffleCounter, - mergedMapOutputsCounter, - taskStatus, copyPhase, sortPhase, this, - mapOutputFile); - shuffleConsumerPlugin.init(shuffleContext); - rIter = shuffleConsumerPlugin.run(); - } else { - // local job runner doesn't have a copy phase - copyPhase.complete(); - final FileSystem rfs = FileSystem.getLocal(job).getRaw(); - rIter = Merger.merge(job, rfs, job.getMapOutputKeyClass(), - job.getMapOutputValueClass(), codec, - getMapFiles(rfs, true), - !conf.getKeepFailedTaskFiles(), - job.getInt(JobContext.IO_SORT_FACTOR, 100), - new Path(getTaskID().toString()), - job.getOutputKeyComparator(), - reporter, spilledRecordsCounter, null, null); - } // free up the data structures mapOutputFilesOnDisk.clear(); @@ -409,9 +393,7 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical) keyClass, valueClass); } - if (shuffleConsumerPlugin != null) { - shuffleConsumerPlugin.close(); - } + shuffleConsumerPlugin.close(); done(umbilical, reporter); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java index f57275255f1..6958be91dbe 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java @@ -19,6 +19,8 @@ package org.apache.hadoop.mapred; import java.io.IOException; +import java.util.Map; + import org.apache.hadoop.mapred.Task.CombineOutputCollector; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; @@ -65,6 +67,7 @@ public static class Context { private final Progress mergePhase; private final Task reduceTask; private final MapOutputFile mapOutputFile; + private final Map localMapFiles; public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS, @@ -80,7 +83,8 @@ public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId, Counters.Counter failedShuffleCounter, Counters.Counter mergedMapOutputsCounter, TaskStatus status, Progress copyPhase, Progress mergePhase, - Task reduceTask, MapOutputFile mapOutputFile) { + Task reduceTask, MapOutputFile mapOutputFile, + Map localMapFiles) { this.reduceId = reduceId; this.jobConf = jobConf; this.localFS = localFS; @@ -101,6 +105,7 @@ public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId, this.mergePhase = mergePhase; this.reduceTask = reduceTask; this.mapOutputFile = mapOutputFile; + this.localMapFiles = localMapFiles; } public org.apache.hadoop.mapreduce.TaskAttemptID getReduceId() { @@ -163,6 +168,9 @@ public Task getReduceTask() { public MapOutputFile getMapOutputFile() { return mapOutputFile; } + public Map getLocalMapFiles() { + return localMapFiles; + } } // end of public static class Context } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index b1c4dc7ef82..00d4764e665 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -60,7 +60,7 @@ class Fetcher extends Thread { /* Default read timeout (in milliseconds) */ private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000; - private final Reporter reporter; + protected final Reporter reporter; private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP, CONNECTION, WRONG_REDUCE} @@ -71,13 +71,13 @@ private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP, private final Counters.Counter badIdErrs; private final Counters.Counter wrongMapErrs; private final Counters.Counter wrongReduceErrs; - private final MergeManager merger; - private final ShuffleSchedulerImpl scheduler; - private final ShuffleClientMetrics metrics; - private final ExceptionReporter exceptionReporter; - private final int id; + protected final MergeManager merger; + protected final ShuffleSchedulerImpl scheduler; + protected final ShuffleClientMetrics metrics; + protected final ExceptionReporter exceptionReporter; + protected final int id; private static int nextId = 0; - private final int reduce; + protected final int reduce; private final int connectionTimeout; private final int readTimeout; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java new file mode 100644 index 00000000000..52796524da5 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.task.reduce; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import javax.crypto.SecretKey; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.IndexRecord; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapOutputFile; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SpillRecord; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +/** + * LocalFetcher is used by LocalJobRunner to perform a local filesystem + * fetch. + */ +class LocalFetcher extends Fetcher { + + private static final Log LOG = LogFactory.getLog(LocalFetcher.class); + + private static final MapHost LOCALHOST = new MapHost("local", "local"); + + private JobConf job; + private Map localMapFiles; + + public LocalFetcher(JobConf job, TaskAttemptID reduceId, + ShuffleSchedulerImpl scheduler, + MergeManager merger, + Reporter reporter, ShuffleClientMetrics metrics, + ExceptionReporter exceptionReporter, + SecretKey shuffleKey, + Map localMapFiles) { + super(job, reduceId, scheduler, merger, reporter, metrics, + exceptionReporter, shuffleKey); + + this.job = job; + this.localMapFiles = localMapFiles; + + setName("localfetcher#" + id); + setDaemon(true); + } + + public void run() { + // Create a worklist of task attempts to work over. + Set maps = new HashSet(); + for (TaskAttemptID map : localMapFiles.keySet()) { + maps.add(map); + } + + while (maps.size() > 0) { + try { + // If merge is on, block + merger.waitForResource(); + metrics.threadBusy(); + + // Copy as much as is possible. + doCopy(maps); + metrics.threadFree(); + } catch (InterruptedException ie) { + } catch (Throwable t) { + exceptionReporter.reportException(t); + } + } + } + + /** + * The crux of the matter... + */ + private void doCopy(Set maps) throws IOException { + Iterator iter = maps.iterator(); + while (iter.hasNext()) { + TaskAttemptID map = iter.next(); + LOG.debug("LocalFetcher " + id + " going to fetch: " + map); + if (copyMapOutput(map)) { + // Successful copy. Remove this from our worklist. + iter.remove(); + } else { + // We got back a WAIT command; go back to the outer loop + // and block for InMemoryMerge. + break; + } + } + } + + /** + * Retrieve the map output of a single map task + * and send it to the merger. + */ + private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException { + // Figure out where the map task stored its output. + Path mapOutputFileName = localMapFiles.get(mapTaskId).getOutputFile(); + Path indexFileName = mapOutputFileName.suffix(".index"); + + // Read its index to determine the location of our split + // and its size. + SpillRecord sr = new SpillRecord(indexFileName, job); + IndexRecord ir = sr.getIndex(reduce); + + long compressedLength = ir.partLength; + long decompressedLength = ir.rawLength; + + // Get the location for the map output - either in-memory or on-disk + MapOutput mapOutput = merger.reserve(mapTaskId, decompressedLength, + id); + + // Check if we can shuffle *now* ... + if (mapOutput == null) { + LOG.info("fetcher#" + id + " - MergeManager returned Status.WAIT ..."); + return false; + } + + // Go! + LOG.info("localfetcher#" + id + " about to shuffle output of map " + + mapOutput.getMapId() + " decomp: " + + decompressedLength + " len: " + compressedLength + " to " + + mapOutput.getDescription()); + + // now read the file, seek to the appropriate section, and send it. + FileSystem localFs = FileSystem.getLocal(job).getRaw(); + FSDataInputStream inStream = localFs.open(mapOutputFileName); + try { + inStream.seek(ir.startOffset); + + mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter); + } finally { + try { + inStream.close(); + } catch (IOException ioe) { + LOG.warn("IOException closing inputstream from map output: " + + ioe.toString()); + } + } + + scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, + mapOutput); + return true; // successful fetch. + } +} + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java index 06c007e1584..93f9a507b10 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java @@ -18,10 +18,12 @@ package org.apache.hadoop.mapreduce.task.reduce; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapred.RawKeyValueIterator; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Task; @@ -56,6 +58,7 @@ public class Shuffle implements ShuffleConsumerPlugin, ExceptionRepo private Progress copyPhase; private TaskStatus taskStatus; private Task reduceTask; //Used for status updates + private Map localMapFiles; @Override public void init(ShuffleConsumerPlugin.Context context) { @@ -69,6 +72,7 @@ public void init(ShuffleConsumerPlugin.Context context) { this.copyPhase = context.getCopyPhase(); this.taskStatus = context.getStatus(); this.reduceTask = context.getReduceTask(); + this.localMapFiles = context.getLocalMapFiles(); scheduler = new ShuffleSchedulerImpl(jobConf, taskStatus, reduceId, this, copyPhase, context.getShuffledMapsCounter(), @@ -103,13 +107,22 @@ public RawKeyValueIterator run() throws IOException, InterruptedException { eventFetcher.start(); // Start the map-output fetcher threads - final int numFetchers = jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5); + boolean isLocal = localMapFiles != null; + final int numFetchers = isLocal ? 1 : + jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5); Fetcher[] fetchers = new Fetcher[numFetchers]; - for (int i=0; i < numFetchers; ++i) { - fetchers[i] = new Fetcher(jobConf, reduceId, scheduler, merger, - reporter, metrics, this, - reduceTask.getShuffleSecret()); - fetchers[i].start(); + if (isLocal) { + fetchers[0] = new LocalFetcher(jobConf, reduceId, scheduler, + merger, reporter, metrics, this, reduceTask.getShuffleSecret(), + localMapFiles); + fetchers[0].start(); + } else { + for (int i=0; i < numFetchers; ++i) { + fetchers[i] = new Fetcher(jobConf, reduceId, scheduler, merger, + reporter, metrics, this, + reduceTask.getShuffleSecret()); + fetchers[i].start(); + } } // Wait for shuffle to complete successfully diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java index ecf5b8f3afc..d8d8cea87b4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java @@ -155,7 +155,7 @@ public void testConsumerApi() { mockCounter, mockCounter, mockCounter, mockCounter, mockCounter, mockCounter, mockTaskStatus, mockProgress, mockProgress, - mockTask, mockMapOutputFile); + mockTask, mockMapOutputFile, null); shuffleConsumerPlugin.init(context); shuffleConsumerPlugin.run(); shuffleConsumerPlugin.close(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java index 59e51ecdd8c..1185ffb7c3a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java @@ -276,18 +276,16 @@ public void testOldCounterA() throws Exception { // there are too few spills to combine (2 < 3) // Each map spills 2^14 records, so maps spill 49152 records, combined. - // The reduce spill count is composed of the read from one segment and - // the intermediate merge of the other two. The intermediate merge + // The combiner has emitted 24576 records to the reducer; these are all + // fetched straight to memory from the map side. The intermediate merge // adds 8192 records per segment read; again, there are too few spills to - // combine, so all 16834 are written to disk (total 32768 spilled records - // for the intermediate merge). The merge into the reduce includes only - // the unmerged segment, size 8192. Total spilled records in the reduce - // is 32768 from the merge + 8192 unmerged segment = 40960 records + // combine, so all Total spilled records in the reduce + // is 8192 records / map * 3 maps = 24576. - // Total: map + reduce = 49152 + 40960 = 90112 + // Total: map + reduce = 49152 + 24576 = 73728 // 3 files, 5120 = 5 * 1024 rec/file = 15360 input records // 4 records/line = 61440 output records - validateCounters(c1, 90112, 15360, 61440); + validateCounters(c1, 73728, 15360, 61440); validateFileCounters(c1, inputSize, 0, 0, 0); validateOldFileCounters(c1, inputSize, 61928, 0, 0); } @@ -316,12 +314,12 @@ public void testOldCounterB() throws Exception { // 1st merge: read + write = 8192 * 4 // 2nd merge: read + write = 8192 * 4 // final merge: 0 - // Total reduce: 65536 + // Total reduce: 32768 - // Total: map + reduce = 2^16 + 2^16 = 131072 + // Total: map + reduce = 2^16 + 2^15 = 98304 // 4 files, 5120 = 5 * 1024 rec/file = 15360 input records // 4 records/line = 81920 output records - validateCounters(c1, 131072, 20480, 81920); + validateCounters(c1, 98304, 20480, 81920); validateFileCounters(c1, inputSize, 0, 0, 0); } @@ -349,7 +347,7 @@ public void testOldCounterC() throws Exception { // Total reduce: 45056 // 5 files, 5120 = 5 * 1024 rec/file = 15360 input records // 4 records/line = 102400 output records - validateCounters(c1, 147456, 25600, 102400); + validateCounters(c1, 122880, 25600, 102400); validateFileCounters(c1, inputSize, 0, 0, 0); } @@ -394,7 +392,7 @@ public void testNewCounterA() throws Exception { job, new Path(OUT_DIR, "outputN0")); assertTrue(job.waitForCompletion(true)); final Counters c1 = Counters.downgrade(job.getCounters()); - validateCounters(c1, 90112, 15360, 61440); + validateCounters(c1, 73728, 15360, 61440); validateFileCounters(c1, inputSize, 0, 0, 0); } @@ -416,7 +414,7 @@ public void testNewCounterB() throws Exception { job, new Path(OUT_DIR, "outputN1")); assertTrue(job.waitForCompletion(true)); final Counters c1 = Counters.downgrade(job.getCounters()); - validateCounters(c1, 131072, 20480, 81920); + validateCounters(c1, 98304, 20480, 81920); validateFileCounters(c1, inputSize, 0, 0, 0); } @@ -439,7 +437,7 @@ public void testNewCounterC() throws Exception { job, new Path(OUT_DIR, "outputN2")); assertTrue(job.waitForCompletion(true)); final Counters c1 = Counters.downgrade(job.getCounters()); - validateCounters(c1, 147456, 25600, 102400); + validateCounters(c1, 122880, 25600, 102400); validateFileCounters(c1, inputSize, 0, 0, 0); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java index 0de326c5a68..0bee2b564b9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java @@ -63,7 +63,7 @@ public void configure(String keySpec, int expect) throws Exception { conf.setOutputValueClass(LongWritable.class); conf.setNumMapTasks(1); - conf.setNumReduceTasks(2); + conf.setNumReduceTasks(1); conf.setOutputFormat(TextOutputFormat.class); conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class); @@ -101,9 +101,7 @@ public void configure(String keySpec, int expect) throws Exception { BufferedReader reader = new BufferedReader(new InputStreamReader(is)); String line = reader.readLine(); //make sure we get what we expect as the first line, and also - //that we have two lines (both the lines must end up in the same - //reducer since the partitioner takes the same key spec for all - //lines + //that we have two lines if (expect == 1) { assertTrue(line.startsWith(line1)); } else if (expect == 2) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java index 7d600ca2534..29640c8854b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java @@ -31,9 +31,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; import org.apache.hadoop.mapred.LocalJobRunner; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; @@ -410,6 +410,7 @@ public boolean nextKeyValue() { } /** Test case for zero mappers */ + @Test public void testEmptyMaps() throws Exception { Job job = Job.getInstance(); Path outputPath = getOutputPath(); @@ -428,5 +429,145 @@ public void testEmptyMaps() throws Exception { boolean success = job.waitForCompletion(true); assertTrue("Empty job should work", success); } + + /** @return the directory where numberfiles are written (mapper inputs) */ + private Path getNumberDirPath() { + return new Path(getInputPath(), "numberfiles"); + } + + /** + * Write out an input file containing an integer. + * + * @param fileNum the file number to write to. + * @param value the value to write to the file + * @return the path of the written file. + */ + private Path makeNumberFile(int fileNum, int value) throws IOException { + Path workDir = getNumberDirPath(); + Path filePath = new Path(workDir, "file" + fileNum); + + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + + OutputStream os = fs.create(filePath); + BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os)); + w.write("" + value); + w.close(); + + return filePath; + } + + /** + * Each record received by this mapper is a number 'n'. + * Emit the values [0..n-1] + */ + public static class SequenceMapper + extends Mapper { + + public void map(LongWritable k, Text v, Context c) + throws IOException, InterruptedException { + int max = Integer.valueOf(v.toString()); + for (int i = 0; i < max; i++) { + c.write(new Text("" + i), NullWritable.get()); + } + } + } + + private final static int NUMBER_FILE_VAL = 100; + + /** + * Tally up the values and ensure that we got as much data + * out as we put in. + * Each mapper generated 'NUMBER_FILE_VAL' values (0..NUMBER_FILE_VAL-1). + * Verify that across all our reducers we got exactly this much + * data back. + */ + private void verifyNumberJob(int numMaps) throws Exception { + Path outputDir = getOutputPath(); + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + + FileStatus [] stats = fs.listStatus(outputDir); + int valueSum = 0; + for (FileStatus f : stats) { + FSDataInputStream istream = fs.open(f.getPath()); + BufferedReader r = new BufferedReader(new InputStreamReader(istream)); + String line = null; + while ((line = r.readLine()) != null) { + valueSum += Integer.valueOf(line.trim()); + } + r.close(); + } + + int maxVal = NUMBER_FILE_VAL - 1; + int expectedPerMapper = maxVal * (maxVal + 1) / 2; + int expectedSum = expectedPerMapper * numMaps; + LOG.info("expected sum: " + expectedSum + ", got " + valueSum); + assertEquals("Didn't get all our results back", expectedSum, valueSum); + } + + /** + * Run a test which creates a SequenceMapper / IdentityReducer + * job over a set of generated number files. + */ + private void doMultiReducerTest(int numMaps, int numReduces, + int parallelMaps, int parallelReduces) throws Exception { + + Path in = getNumberDirPath(); + Path out = getOutputPath(); + + // Clear data from any previous tests. + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + if (fs.exists(out)) { + fs.delete(out, true); + } + + if (fs.exists(in)) { + fs.delete(in, true); + } + + for (int i = 0; i < numMaps; i++) { + makeNumberFile(i, 100); + } + + Job job = Job.getInstance(); + job.setNumReduceTasks(numReduces); + + job.setMapperClass(SequenceMapper.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(NullWritable.class); + FileInputFormat.addInputPath(job, in); + FileOutputFormat.setOutputPath(job, out); + + LocalJobRunner.setLocalMaxRunningMaps(job, parallelMaps); + LocalJobRunner.setLocalMaxRunningReduces(job, parallelReduces); + + boolean result = job.waitForCompletion(true); + assertTrue("Job failed!!", result); + + verifyNumberJob(numMaps); + } + + @Test + public void testOneMapMultiReduce() throws Exception { + doMultiReducerTest(1, 2, 1, 1); + } + + @Test + public void testOneMapMultiParallelReduce() throws Exception { + doMultiReducerTest(1, 2, 1, 2); + } + + @Test + public void testMultiMapOneReduce() throws Exception { + doMultiReducerTest(4, 1, 2, 1); + } + + @Test + public void testMultiMapMultiReduce() throws Exception { + doMultiReducerTest(4, 4, 2, 2); + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java index 3262b8f11ab..3a2b8312bda 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java @@ -56,7 +56,7 @@ private void testComparator(String keySpec, int expect) conf.set("mapreduce.partition.keypartitioner.options", "-k1.1,1.1"); conf.set(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " "); - Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 2, + Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1, line1 +"\n" + line2 + "\n"); job.setMapperClass(InverseMapper.class); job.setReducerClass(Reducer.class);