From 44b7e031145490e9730cdddbb4b0ed23f5bdc049 Mon Sep 17 00:00:00 2001 From: Devaraj Das Date: Thu, 4 Aug 2011 01:15:33 +0000 Subject: [PATCH] MAPREDUCE-2705. Permits parallel multiple task launches. Contributed by Thomas Graves. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1153717 13f79535-47bb-0310-9956-ffa450edef68 --- mapreduce/CHANGES.txt | 3 ++ .../org/apache/hadoop/mapred/TaskTracker.java | 51 ++++++++++--------- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/mapreduce/CHANGES.txt b/mapreduce/CHANGES.txt index 17e23dae06c..30fe349403c 100644 --- a/mapreduce/CHANGES.txt +++ b/mapreduce/CHANGES.txt @@ -209,6 +209,9 @@ Trunk (unreleased changes) MAPREDUCE-2602. Allow setting of end-of-record delimiter for TextInputFormat for the old API. (Ahmed Radwan via todd) + MAPREDUCE-2705. Permits parallel multiple task launches. + (Thomas Graves via ddas) + OPTIMIZATIONS MAPREDUCE-2026. Make JobTracker.getJobCounters() and diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java b/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java index 65f2a00a426..52ea9162b19 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java @@ -1253,7 +1253,7 @@ public class TaskTracker } } - private void launchTaskForJob(TaskInProgress tip, JobConf jobConf, + protected void launchTaskForJob(TaskInProgress tip, JobConf jobConf, UserGroupInformation ugi) throws IOException { synchronized (tip) { tip.setJobConf(jobConf); @@ -2351,30 +2351,35 @@ public class TaskTracker * All exceptions are handled locally, so that we don't mess up the * task tracker. */ - void startNewTask(TaskInProgress tip) { - try { - RunningJob rjob = localizeJob(tip); - // Localization is done. Neither rjob.jobConf nor rjob.ugi can be null - launchTaskForJob(tip, new JobConf(rjob.jobConf), rjob.ugi); - } catch (Throwable e) { - String msg = ("Error initializing " + tip.getTask().getTaskID() + - ":\n" + StringUtils.stringifyException(e)); - LOG.warn(msg); - tip.reportDiagnosticInfo(msg); - try { - tip.kill(true); - tip.cleanup(true); - } catch (IOException ie2) { - LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n" + - StringUtils.stringifyException(ie2)); + void startNewTask(final TaskInProgress tip) { + Thread launchThread = new Thread(new Runnable() { + @Override + public void run() { + try { + RunningJob rjob = localizeJob(tip); + // Localization is done. Neither rjob.jobConf nor rjob.ugi can be null + launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob.ugi); + } catch (Throwable e) { + String msg = ("Error initializing " + tip.getTask().getTaskID() + + ":\n" + StringUtils.stringifyException(e)); + LOG.warn(msg); + tip.reportDiagnosticInfo(msg); + try { + tip.kill(true); + tip.cleanup(true); + } catch (IOException ie2) { + LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n" + + StringUtils.stringifyException(ie2)); + } + if (e instanceof Error) { + LOG.error("TaskLauncher error " + + StringUtils.stringifyException(e)); + } + } } + }); + launchThread.start(); - // Careful! - // This might not be an 'Exception' - don't handle 'Error' here! - if (e instanceof Error) { - throw ((Error) e); - } - } } void addToMemoryManager(TaskAttemptID attemptId, boolean isMap,