From 71ce96738074723e4e3eba9ba09d2371e099beb3 Mon Sep 17 00:00:00 2001 From: Sanford Ryza Date: Thu, 14 Nov 2013 08:06:50 +0000 Subject: [PATCH] MAPREDUCE-5481. Enable uber jobs to have multiple reducers (Sandy Ryza) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1541846 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 2 + .../hadoop/mapred/LocalContainerLauncher.java | 95 +++++++++++++++++-- .../mapreduce/v2/app/job/impl/JobImpl.java | 8 +- .../hadoop/mapreduce/v2/TestUberAM.java | 36 +++++-- 4 files changed, 117 insertions(+), 24 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index a712d9770e2..53f476af880 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -39,6 +39,8 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-5624 Move grizzly-test and junit dependencies to test scope (Ted Yu via stevel) + MAPREDUCE-5481. Enable uber jobs to have multiple reducers (Sandy Ryza) + OPTIMIZATIONS MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java index dd157282ae5..578cdcdb91a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java @@ -20,7 +20,9 @@ package org.apache.hadoop.mapred; import java.io.File; import java.io.IOException; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -35,6 +37,7 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; @@ -168,6 +171,10 @@ public class LocalContainerLauncher extends AbstractService implements public void run() { ContainerLauncherEvent event = null; + // Collect locations of map outputs to give to reduces + Map localMapFiles = + new HashMap(); + // _must_ either run subtasks sequentially or accept expense of new JVMs // (i.e., fork()), else will get weird failures when maps try to create/ // write same dirname or filename: no chdir() in Java @@ -223,7 +230,7 @@ public class LocalContainerLauncher extends AbstractService implements context.getEventHandler().handle(jce); } runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks, - (numReduceTasks > 0)); + (numReduceTasks > 0), localMapFiles); } catch (RuntimeException re) { JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId()); @@ -265,7 +272,8 @@ public class LocalContainerLauncher extends AbstractService implements final TaskType taskType, TaskAttemptId attemptID, final int numMapTasks, - boolean renameOutputs) + boolean renameOutputs, + Map localMapFiles) throws RuntimeException, IOException { org.apache.hadoop.mapred.TaskAttemptID classicAttemptID = TypeConverter.fromYarn(attemptID); @@ -309,7 +317,9 @@ public class LocalContainerLauncher extends AbstractService implements map.run(conf, umbilical); if (renameOutputs) { - renameMapOutputForReduce(conf, attemptID, map.getMapOutputFile()); + MapOutputFile renamed = renameMapOutputForReduce(conf, attemptID, + map.getMapOutputFile()); + localMapFiles.put(classicAttemptID, renamed); } relocalize(); @@ -335,10 +345,11 @@ public class LocalContainerLauncher extends AbstractService implements conf.set(MRConfig.MASTER_ADDRESS, "local"); // bypass shuffle ReduceTask reduce = (ReduceTask)task; + reduce.setLocalMapFiles(localMapFiles); reduce.setConf(conf); reduce.run(conf, umbilical); - //relocalize(); // needed only if more than one reducer supported (is MAPREDUCE-434 fixed yet?) + relocalize(); } } catch (FSError e) { @@ -387,15 +398,16 @@ public class LocalContainerLauncher extends AbstractService implements * so there are no particular compatibility issues.) */ @SuppressWarnings("deprecation") - private void renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId, - MapOutputFile subMapOutputFile) - throws IOException { + private MapOutputFile renameMapOutputForReduce(JobConf conf, + TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException { FileSystem localFs = FileSystem.getLocal(conf); // move map output to reduce input Path mapOut = subMapOutputFile.getOutputFile(); FileStatus mStatus = localFs.getFileStatus(mapOut); Path reduceIn = subMapOutputFile.getInputFileForWrite( TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen()); + Path mapOutIndex = new Path(mapOut.toString() + ".index"); + Path reduceInIndex = new Path(reduceIn.toString() + ".index"); if (LOG.isDebugEnabled()) { LOG.debug("Renaming map output file for task attempt " + mapId.toString() + " from original location " + mapOut.toString() @@ -407,6 +419,10 @@ public class LocalContainerLauncher extends AbstractService implements } if (!localFs.rename(mapOut, reduceIn)) throw new IOException("Couldn't rename " + mapOut); + if (!localFs.rename(mapOutIndex, reduceInIndex)) + throw new IOException("Couldn't rename " + mapOutIndex); + + return new RenamedMapOutputFile(reduceIn); } /** @@ -441,5 +457,70 @@ public class LocalContainerLauncher extends AbstractService implements } } // end SubtaskRunner + + private static class RenamedMapOutputFile extends MapOutputFile { + private Path path; + + public RenamedMapOutputFile(Path path) { + this.path = path; + } + + @Override + public Path getOutputFile() throws IOException { + return path; + } + + @Override + public Path getOutputFileForWrite(long size) throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public Path getOutputFileForWriteInVolume(Path existing) { + throw new UnsupportedOperationException(); + } + @Override + public Path getOutputIndexFile() throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public Path getOutputIndexFileForWrite(long size) throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public Path getOutputIndexFileForWriteInVolume(Path existing) { + throw new UnsupportedOperationException(); + } + @Override + public Path getSpillFile(int spillNumber) throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public Path getSpillFileForWrite(int spillNumber, long size) + throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public Path getSpillIndexFile(int spillNumber) throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public Path getSpillIndexFileForWrite(int spillNumber, long size) + throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public Path getInputFile(int mapId) throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public Path getInputFileForWrite(TaskID mapId, long size) + throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public void removeAll() throws IOException { + throw new UnsupportedOperationException(); + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index c884a51cbf9..5f08303b36f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -1173,11 +1173,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, // these are no longer "system" settings, necessarily; user may override int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9); - //FIXME: handling multiple reduces within a single AM does not seem to - //work. int sysMaxReduces = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1); - boolean isValidUberMaxReduces = (sysMaxReduces == 0) - || (sysMaxReduces == 1); long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES, fs.getDefaultBlockSize(this.remoteJobSubmitDir)); // FIXME: this is wrong; get FS from @@ -1225,7 +1221,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, // and thus requires sequential execution. isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks && smallInput && smallMemory && smallCpu - && notChainJob && isValidUberMaxReduces; + && notChainJob; if (isUber) { LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+" @@ -1259,8 +1255,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, msg.append(" too much RAM;"); if (!notChainJob) msg.append(" chainjob;"); - if (!isValidUberMaxReduces) - msg.append(" not supported uber max reduces"); LOG.info(msg.toString()); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java index 868c2d5ae3d..32199e55410 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java @@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2; import java.io.File; import java.io.IOException; +import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,12 +40,14 @@ import org.junit.Test; public class TestUberAM extends TestMRJobs { private static final Log LOG = LogFactory.getLog(TestUberAM.class); - + private int numSleepReducers; + @BeforeClass public static void setup() throws IOException { TestMRJobs.setup(); if (mrCluster != null) { mrCluster.getConfig().setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true); + mrCluster.getConfig().setInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 3); } } @@ -52,8 +55,19 @@ public class TestUberAM extends TestMRJobs { @Test public void testSleepJob() throws IOException, InterruptedException, ClassNotFoundException { + numSleepReducers = 1; if (mrCluster != null) { - mrCluster.getConfig().setInt("TestMRJobs.testSleepJob.reduces", 1); + mrCluster.getConfig().setInt("TestMRJobs.testSleepJob.reduces", numSleepReducers); + } + super.testSleepJob(); + } + + @Test + public void testSleepJobWithMultipleReducers() + throws IOException, InterruptedException, ClassNotFoundException { + numSleepReducers = 3; + if (mrCluster != null) { + mrCluster.getConfig().setInt("TestMRJobs.testSleepJob.reduces", numSleepReducers); } super.testSleepJob(); } @@ -67,7 +81,7 @@ public class TestUberAM extends TestMRJobs { .getValue()); Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) .getValue()); - Assert.assertEquals(1, + Assert.assertEquals(numSleepReducers, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue()); Assert .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null @@ -76,11 +90,11 @@ public class TestUberAM extends TestMRJobs { .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0); - Assert.assertEquals(3, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS) - .getValue()); - Assert.assertEquals(1, counters.findCounter(JobCounter.NUM_UBER_SUBREDUCES) - .getValue()); - Assert.assertEquals(4, + Assert.assertEquals(3, + counters.findCounter(JobCounter.NUM_UBER_SUBMAPS).getValue()); + Assert.assertEquals(numSleepReducers, + counters.findCounter(JobCounter.NUM_UBER_SUBREDUCES).getValue()); + Assert.assertEquals(3 + numSleepReducers, counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue()); } @@ -138,8 +152,10 @@ public class TestUberAM extends TestMRJobs { TaskCompletionEvent[] events = job.getTaskCompletionEvents(0, 2); Assert.assertEquals(1, events.length); - Assert.assertEquals(TaskCompletionEvent.Status.TIPFAILED, - events[0].getStatus()); + // TIPFAILED if it comes from the AM, FAILED if it comes from the JHS + TaskCompletionEvent.Status status = events[0].getStatus(); + Assert.assertTrue(status == TaskCompletionEvent.Status.FAILED || + status == TaskCompletionEvent.Status.TIPFAILED); Assert.assertEquals(JobStatus.State.FAILED, job.getJobState()); //Disabling till UberAM honors MRJobConfig.MAP_MAX_ATTEMPTS