MAPREDUCE-5481. Enable uber jobs to have multiple reducers (Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1541844 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2013-11-14 07:56:11 +00:00
parent b771197ed5
commit 3d95049f79
4 changed files with 117 additions and 24 deletions

View File

@ -176,6 +176,8 @@ Release 2.3.0 - UNRELEASED
MAPREDUCE-5624 Move grizzly-test and junit dependencies to test scope MAPREDUCE-5624 Move grizzly-test and junit dependencies to test scope
(Ted Yu via stevel) (Ted Yu via stevel)
MAPREDUCE-5481. Enable uber jobs to have multiple reducers (Sandy Ryza)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza) MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)

View File

@ -20,7 +20,9 @@ package org.apache.hadoop.mapred;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -35,6 +37,7 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@ -168,6 +171,10 @@ public class LocalContainerLauncher extends AbstractService implements
public void run() { public void run() {
ContainerLauncherEvent event = null; ContainerLauncherEvent event = null;
// Collect locations of map outputs to give to reduces
Map<TaskAttemptID, MapOutputFile> localMapFiles =
new HashMap<TaskAttemptID, MapOutputFile>();
// _must_ either run subtasks sequentially or accept expense of new JVMs // _must_ either run subtasks sequentially or accept expense of new JVMs
// (i.e., fork()), else will get weird failures when maps try to create/ // (i.e., fork()), else will get weird failures when maps try to create/
// write same dirname or filename: no chdir() in Java // write same dirname or filename: no chdir() in Java
@ -223,7 +230,7 @@ public class LocalContainerLauncher extends AbstractService implements
context.getEventHandler().handle(jce); context.getEventHandler().handle(jce);
} }
runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks, runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
(numReduceTasks > 0)); (numReduceTasks > 0), localMapFiles);
} catch (RuntimeException re) { } catch (RuntimeException re) {
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId()); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
@ -265,7 +272,8 @@ public class LocalContainerLauncher extends AbstractService implements
final TaskType taskType, final TaskType taskType,
TaskAttemptId attemptID, TaskAttemptId attemptID,
final int numMapTasks, final int numMapTasks,
boolean renameOutputs) boolean renameOutputs,
Map<TaskAttemptID, MapOutputFile> localMapFiles)
throws RuntimeException, IOException { throws RuntimeException, IOException {
org.apache.hadoop.mapred.TaskAttemptID classicAttemptID = org.apache.hadoop.mapred.TaskAttemptID classicAttemptID =
TypeConverter.fromYarn(attemptID); TypeConverter.fromYarn(attemptID);
@ -309,7 +317,9 @@ public class LocalContainerLauncher extends AbstractService implements
map.run(conf, umbilical); map.run(conf, umbilical);
if (renameOutputs) { if (renameOutputs) {
renameMapOutputForReduce(conf, attemptID, map.getMapOutputFile()); MapOutputFile renamed = renameMapOutputForReduce(conf, attemptID,
map.getMapOutputFile());
localMapFiles.put(classicAttemptID, renamed);
} }
relocalize(); relocalize();
@ -335,10 +345,11 @@ public class LocalContainerLauncher extends AbstractService implements
conf.set(MRConfig.MASTER_ADDRESS, "local"); // bypass shuffle conf.set(MRConfig.MASTER_ADDRESS, "local"); // bypass shuffle
ReduceTask reduce = (ReduceTask)task; ReduceTask reduce = (ReduceTask)task;
reduce.setLocalMapFiles(localMapFiles);
reduce.setConf(conf); reduce.setConf(conf);
reduce.run(conf, umbilical); reduce.run(conf, umbilical);
//relocalize(); // needed only if more than one reducer supported (is MAPREDUCE-434 fixed yet?) relocalize();
} }
} catch (FSError e) { } catch (FSError e) {
@ -387,15 +398,16 @@ public class LocalContainerLauncher extends AbstractService implements
* so there are no particular compatibility issues.) * so there are no particular compatibility issues.)
*/ */
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
private void renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId, private MapOutputFile renameMapOutputForReduce(JobConf conf,
MapOutputFile subMapOutputFile) TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
throws IOException {
FileSystem localFs = FileSystem.getLocal(conf); FileSystem localFs = FileSystem.getLocal(conf);
// move map output to reduce input // move map output to reduce input
Path mapOut = subMapOutputFile.getOutputFile(); Path mapOut = subMapOutputFile.getOutputFile();
FileStatus mStatus = localFs.getFileStatus(mapOut); FileStatus mStatus = localFs.getFileStatus(mapOut);
Path reduceIn = subMapOutputFile.getInputFileForWrite( Path reduceIn = subMapOutputFile.getInputFileForWrite(
TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen()); TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
Path mapOutIndex = new Path(mapOut.toString() + ".index");
Path reduceInIndex = new Path(reduceIn.toString() + ".index");
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Renaming map output file for task attempt " LOG.debug("Renaming map output file for task attempt "
+ mapId.toString() + " from original location " + mapOut.toString() + mapId.toString() + " from original location " + mapOut.toString()
@ -407,6 +419,10 @@ public class LocalContainerLauncher extends AbstractService implements
} }
if (!localFs.rename(mapOut, reduceIn)) if (!localFs.rename(mapOut, reduceIn))
throw new IOException("Couldn't rename " + mapOut); throw new IOException("Couldn't rename " + mapOut);
if (!localFs.rename(mapOutIndex, reduceInIndex))
throw new IOException("Couldn't rename " + mapOutIndex);
return new RenamedMapOutputFile(reduceIn);
} }
/** /**
@ -441,5 +457,70 @@ public class LocalContainerLauncher extends AbstractService implements
} }
} // end SubtaskRunner } // end SubtaskRunner
private static class RenamedMapOutputFile extends MapOutputFile {
private Path path;
public RenamedMapOutputFile(Path path) {
this.path = path;
}
@Override
public Path getOutputFile() throws IOException {
return path;
}
@Override
public Path getOutputFileForWrite(long size) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public Path getOutputFileForWriteInVolume(Path existing) {
throw new UnsupportedOperationException();
}
@Override
public Path getOutputIndexFile() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public Path getOutputIndexFileForWrite(long size) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public Path getOutputIndexFileForWriteInVolume(Path existing) {
throw new UnsupportedOperationException();
}
@Override
public Path getSpillFile(int spillNumber) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public Path getSpillFileForWrite(int spillNumber, long size)
throws IOException {
throw new UnsupportedOperationException();
}
@Override
public Path getSpillIndexFile(int spillNumber) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public Path getSpillIndexFileForWrite(int spillNumber, long size)
throws IOException {
throw new UnsupportedOperationException();
}
@Override
public Path getInputFile(int mapId) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public Path getInputFileForWrite(TaskID mapId, long size)
throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void removeAll() throws IOException {
throw new UnsupportedOperationException();
}
}
} }

View File

@ -1173,11 +1173,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
// these are no longer "system" settings, necessarily; user may override // these are no longer "system" settings, necessarily; user may override
int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9); int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
//FIXME: handling multiple reduces within a single AM does not seem to
//work.
int sysMaxReduces = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1); int sysMaxReduces = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
boolean isValidUberMaxReduces = (sysMaxReduces == 0)
|| (sysMaxReduces == 1);
long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES, long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
fs.getDefaultBlockSize(this.remoteJobSubmitDir)); // FIXME: this is wrong; get FS from fs.getDefaultBlockSize(this.remoteJobSubmitDir)); // FIXME: this is wrong; get FS from
@ -1225,7 +1221,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
// and thus requires sequential execution. // and thus requires sequential execution.
isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
&& smallInput && smallMemory && smallCpu && smallInput && smallMemory && smallCpu
&& notChainJob && isValidUberMaxReduces; && notChainJob;
if (isUber) { if (isUber) {
LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+" LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"
@ -1259,8 +1255,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
msg.append(" too much RAM;"); msg.append(" too much RAM;");
if (!notChainJob) if (!notChainJob)
msg.append(" chainjob;"); msg.append(" chainjob;");
if (!isValidUberMaxReduces)
msg.append(" not supported uber max reduces");
LOG.info(msg.toString()); LOG.info(msg.toString());
} }
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -39,12 +40,14 @@ import org.junit.Test;
public class TestUberAM extends TestMRJobs { public class TestUberAM extends TestMRJobs {
private static final Log LOG = LogFactory.getLog(TestUberAM.class); private static final Log LOG = LogFactory.getLog(TestUberAM.class);
private int numSleepReducers;
@BeforeClass @BeforeClass
public static void setup() throws IOException { public static void setup() throws IOException {
TestMRJobs.setup(); TestMRJobs.setup();
if (mrCluster != null) { if (mrCluster != null) {
mrCluster.getConfig().setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true); mrCluster.getConfig().setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
mrCluster.getConfig().setInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 3);
} }
} }
@ -52,8 +55,19 @@ public class TestUberAM extends TestMRJobs {
@Test @Test
public void testSleepJob() public void testSleepJob()
throws IOException, InterruptedException, ClassNotFoundException { throws IOException, InterruptedException, ClassNotFoundException {
numSleepReducers = 1;
if (mrCluster != null) { if (mrCluster != null) {
mrCluster.getConfig().setInt("TestMRJobs.testSleepJob.reduces", 1); mrCluster.getConfig().setInt("TestMRJobs.testSleepJob.reduces", numSleepReducers);
}
super.testSleepJob();
}
@Test
public void testSleepJobWithMultipleReducers()
throws IOException, InterruptedException, ClassNotFoundException {
numSleepReducers = 3;
if (mrCluster != null) {
mrCluster.getConfig().setInt("TestMRJobs.testSleepJob.reduces", numSleepReducers);
} }
super.testSleepJob(); super.testSleepJob();
} }
@ -67,7 +81,7 @@ public class TestUberAM extends TestMRJobs {
.getValue()); .getValue());
Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
.getValue()); .getValue());
Assert.assertEquals(1, Assert.assertEquals(numSleepReducers,
counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue()); counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
Assert Assert
.assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
@ -76,11 +90,11 @@ public class TestUberAM extends TestMRJobs {
.assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
&& counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0); && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
Assert.assertEquals(3, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS) Assert.assertEquals(3,
.getValue()); counters.findCounter(JobCounter.NUM_UBER_SUBMAPS).getValue());
Assert.assertEquals(1, counters.findCounter(JobCounter.NUM_UBER_SUBREDUCES) Assert.assertEquals(numSleepReducers,
.getValue()); counters.findCounter(JobCounter.NUM_UBER_SUBREDUCES).getValue());
Assert.assertEquals(4, Assert.assertEquals(3 + numSleepReducers,
counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue()); counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue());
} }
@ -138,8 +152,10 @@ public class TestUberAM extends TestMRJobs {
TaskCompletionEvent[] events = job.getTaskCompletionEvents(0, 2); TaskCompletionEvent[] events = job.getTaskCompletionEvents(0, 2);
Assert.assertEquals(1, events.length); Assert.assertEquals(1, events.length);
Assert.assertEquals(TaskCompletionEvent.Status.TIPFAILED, // TIPFAILED if it comes from the AM, FAILED if it comes from the JHS
events[0].getStatus()); TaskCompletionEvent.Status status = events[0].getStatus();
Assert.assertTrue(status == TaskCompletionEvent.Status.FAILED ||
status == TaskCompletionEvent.Status.TIPFAILED);
Assert.assertEquals(JobStatus.State.FAILED, job.getJobState()); Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
//Disabling till UberAM honors MRJobConfig.MAP_MAX_ATTEMPTS //Disabling till UberAM honors MRJobConfig.MAP_MAX_ATTEMPTS