From e3b9d11da839f6401499950ea7acf696de6ceea5 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 19 Oct 2011 05:28:22 +0000 Subject: [PATCH] MAPREDUCE-3144. Augmented JobHistory with the information needed for serving aggregated logs. Contributed by Siddharth Seth. svn merge -c r1185976 --ignore-ancestry ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1185977 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../jobhistory/JobHistoryEventHandler.java | 61 +++++----- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 106 +++++++++++++--- .../hadoop/mapreduce/v2/app/job/Job.java | 6 + .../mapreduce/v2/app/job/impl/JobImpl.java | 18 ++- .../v2/app/job/impl/TaskAttemptImpl.java | 35 ++++-- .../mapreduce/v2/app/recover/Recovery.java | 4 + .../v2/app/recover/RecoveryService.java | 17 ++- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 21 +++- .../hadoop/mapreduce/v2/app/MockJobs.java | 6 + .../mapreduce/v2/app/TestMRAppMaster.java | 12 +- .../v2/app/TestRMContainerAllocator.java | 4 +- .../hadoop/mapreduce/v2/app/TestRecovery.java | 26 +++- .../v2/app/TestRuntimeEstimators.java | 6 + .../mapreduce/v2/app/TestStagingCleanup.java | 5 +- .../src/main/avro/Events.avpr | 17 ++- .../mapreduce/jobhistory/AMStartedEvent.java | 114 ++++++++++++++++++ .../mapreduce/jobhistory/EventReader.java | 5 +- .../jobhistory/JobHistoryParser.java | 103 +++++++++++++++- .../mapreduce/jobhistory/JobInitedEvent.java | 2 - .../jobhistory/TaskAttemptStartedEvent.java | 21 +++- .../hadoop/mapreduce/v2/hs/CompletedJob.java | 6 + .../mapreduce/v2/hs/CompletedTaskAttempt.java | 26 +--- .../hadoop/mapreduce/v2/hs/PartialJob.java | 6 + .../mapreduce/v2/hs/TestJobHistoryEvents.java | 6 + .../v2/hs/TestJobHistoryParsing.java | 24 +++- .../hadoop/yarn/api/ApplicationConstants.java | 20 ++- .../records/ApplicationSubmissionContext.java | 2 +- .../distributedshell/ApplicationMaster.java | 5 +- .../hadoop/yarn/util/ConverterUtils.java | 40 ++---- .../nodemanager/webapp/ContainerLogsPage.java | 2 +- .../nodemanager/webapp/ContainerPage.java | 4 +- .../resourcemanager/ClientRMService.java | 5 +- .../server/resourcemanager/RMAppManager.java | 11 +- .../RMAppManagerSubmitEvent.java | 12 +- .../amlauncher/AMLauncher.java | 21 ++-- .../server/resourcemanager/rmapp/RMApp.java | 6 + .../resourcemanager/rmapp/RMAppImpl.java | 10 +- .../yarn/server/resourcemanager/MockNM.java | 7 +- .../resourcemanager/TestAppManager.java | 2 +- .../TestApplicationMasterLauncher.java | 29 ++++- .../applicationsmanager/MockAsm.java | 5 + .../resourcemanager/rmapp/MockRMApp.java | 6 + .../rmapp/TestRMAppTransitions.java | 6 +- .../site/apt/WritingYarnApplications.apt.vm | 32 ++--- 45 files changed, 688 insertions(+), 197 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index cf2db668dcc..42a88a8c187 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -359,6 +359,9 @@ Release 0.23.0 - Unreleased from the NodeManager and set MALLOC_ARENA_MAX for all daemons and containers. (Chris Riccomini via acmurthy) + MAPREDUCE-3144. Augmented JobHistory with the information needed for + serving aggregated logs. (Siddharth Seth via vinodkv) + OPTIMIZATIONS MAPREDUCE-2026. Make JobTracker.getJobCounters() and diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 797d48a2f45..4b97feda2d4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -275,7 +275,7 @@ public void stop() { * @param jobId the jobId. * @throws IOException */ - protected void setupEventWriter(JobId jobId, JobSubmittedEvent jse) + protected void setupEventWriter(JobId jobId) throws IOException { if (stagingDirPath == null) { LOG.error("Log Directory is null, returning"); @@ -285,9 +285,6 @@ protected void setupEventWriter(JobId jobId, JobSubmittedEvent jse) MetaInfo oldFi = fileMap.get(jobId); Configuration conf = getConfig(); - long submitTime = oldFi == null ? jse.getSubmitTime() : oldFi - .getJobIndexInfo().getSubmitTime(); - // TODO Ideally this should be written out to the job dir // (.staging/jobid/files - RecoveryService will need to be patched) Path historyFile = JobHistoryUtils.getStagingJobHistoryFile( @@ -301,6 +298,8 @@ protected void setupEventWriter(JobId jobId, JobSubmittedEvent jse) String jobName = context.getJob(jobId).getName(); EventWriter writer = (oldFi == null) ? null : oldFi.writer; + Path logDirConfPath = + JobHistoryUtils.getStagingConfFile(stagingDirPath, jobId, startCount); if (writer == null) { try { FSDataOutputStream out = stagingDirFS.create(historyFile, true); @@ -312,31 +311,28 @@ protected void setupEventWriter(JobId jobId, JobSubmittedEvent jse) + "[" + jobName + "]"); throw ioe; } - } - - Path logDirConfPath = null; - if (conf != null) { - // TODO Ideally this should be written out to the job dir - // (.staging/jobid/files - RecoveryService will need to be patched) - logDirConfPath = JobHistoryUtils.getStagingConfFile(stagingDirPath, jobId, - startCount); - FSDataOutputStream jobFileOut = null; - try { - if (logDirConfPath != null) { - jobFileOut = stagingDirFS.create(logDirConfPath, true); - conf.writeXml(jobFileOut); - jobFileOut.close(); + + //Write out conf only if the writer isn't already setup. + if (conf != null) { + // TODO Ideally this should be written out to the job dir + // (.staging/jobid/files - RecoveryService will need to be patched) + FSDataOutputStream jobFileOut = null; + try { + if (logDirConfPath != null) { + jobFileOut = stagingDirFS.create(logDirConfPath, true); + conf.writeXml(jobFileOut); + jobFileOut.close(); + } + } catch (IOException e) { + LOG.info("Failed to write the job configuration file", e); + throw e; } - } catch (IOException e) { - LOG.info("Failed to write the job configuration file", e); - throw e; } } - - MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer, submitTime, + + MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer, user, jobName, jobId); fi.getJobSummary().setJobId(jobId); - fi.getJobSummary().setJobSubmitTime(submitTime); fileMap.put(jobId, fi); } @@ -368,11 +364,9 @@ protected void handleEvent(JobHistoryEvent event) { synchronized (lock) { // If this is JobSubmitted Event, setup the writer - if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) { + if (event.getHistoryEvent().getEventType() == EventType.AM_STARTED) { try { - JobSubmittedEvent jobSubmittedEvent = - (JobSubmittedEvent) event.getHistoryEvent(); - setupEventWriter(event.getJobID(), jobSubmittedEvent); + setupEventWriter(event.getJobID()); } catch (IOException ioe) { LOG.error("Error JobHistoryEventHandler in handleEvent: " + event, ioe); @@ -396,6 +390,12 @@ protected void handleEvent(JobHistoryEvent event) { throw new YarnException(e); } + if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) { + JobSubmittedEvent jobSubmittedEvent = + (JobSubmittedEvent) event.getHistoryEvent(); + mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime()); + } + // If this is JobFinishedEvent, close the writer and setup the job-index if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) { try { @@ -436,6 +436,7 @@ private void processEventForJobSummary(HistoryEvent event, JobSummary summary, J JobSubmittedEvent jse = (JobSubmittedEvent) event; summary.setUser(jse.getUserName()); summary.setQueue(jse.getJobQueueName()); + summary.setJobSubmitTime(jse.getSubmitTime()); break; case JOB_INITED: JobInitedEvent jie = (JobInitedEvent) event; @@ -588,12 +589,12 @@ private class MetaInfo { JobIndexInfo jobIndexInfo; JobSummary jobSummary; - MetaInfo(Path historyFile, Path conf, EventWriter writer, long submitTime, + MetaInfo(Path historyFile, Path conf, EventWriter writer, String user, String jobName, JobId jobId) { this.historyFile = historyFile; this.confFile = conf; this.writer = writer; - this.jobIndexInfo = new JobIndexInfo(submitTime, -1, user, jobName, jobId, -1, -1, null); + this.jobIndexInfo = new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null); this.jobSummary = new JobSummary(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 94db4265ffe..8d229bc5649 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -22,7 +22,10 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -40,6 +43,8 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; +import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; @@ -72,6 +77,7 @@ import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner; import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -82,6 +88,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -115,14 +122,20 @@ public class MRAppMaster extends CompositeService { private static final Log LOG = LogFactory.getLog(MRAppMaster.class); private Clock clock; - private final long startTime = System.currentTimeMillis(); + private final long startTime; + private final long appSubmitTime; private String appName; private final ApplicationAttemptId appAttemptID; + private final ContainerId containerID; + private final String nmHost; + private final int nmHttpPort; protected final MRAppMetrics metrics; private Set completedTasksFromPreviousRun; + private List amInfos; private AppContext context; private Dispatcher dispatcher; private ClientService clientService; + private Recovery recoveryServ; private ContainerAllocator containerAllocator; private ContainerLauncher containerLauncher; private TaskCleaner taskCleaner; @@ -131,19 +144,29 @@ public class MRAppMaster extends CompositeService { private JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(); private JobEventDispatcher jobEventDispatcher; + private boolean inRecovery = false; private Job job; private Credentials fsTokens = new Credentials(); // Filled during init private UserGroupInformation currentUser; // Will be setup during init - public MRAppMaster(ApplicationAttemptId applicationAttemptId) { - this(applicationAttemptId, new SystemClock()); + public MRAppMaster(ApplicationAttemptId applicationAttemptId, + ContainerId containerId, String nmHost, int nmHttpPort, long appSubmitTime) { + this(applicationAttemptId, containerId, nmHost, nmHttpPort, + new SystemClock(), appSubmitTime); } - public MRAppMaster(ApplicationAttemptId applicationAttemptId, Clock clock) { + public MRAppMaster(ApplicationAttemptId applicationAttemptId, + ContainerId containerId, String nmHost, int nmHttpPort, Clock clock, + long appSubmitTime) { super(MRAppMaster.class.getName()); this.clock = clock; + this.startTime = clock.getTime(); + this.appSubmitTime = appSubmitTime; this.appAttemptID = applicationAttemptId; + this.containerID = containerId; + this.nmHost = nmHost; + this.nmHttpPort = nmHttpPort; this.metrics = MRAppMetrics.create(); LOG.info("Created MRAppMaster for application " + applicationAttemptId); } @@ -162,11 +185,11 @@ public void init(final Configuration conf) { if (conf.getBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false) && appAttemptID.getAttemptId() > 1) { LOG.info("Recovery is enabled. Will try to recover from previous life."); - Recovery recoveryServ = new RecoveryService(appAttemptID, clock); + recoveryServ = new RecoveryService(appAttemptID, clock); addIfService(recoveryServ); dispatcher = recoveryServ.getDispatcher(); clock = recoveryServ.getClock(); - completedTasksFromPreviousRun = recoveryServ.getCompletedTasks(); + inRecovery = true; } else { dispatcher = new AsyncDispatcher(); addIfService(dispatcher); @@ -327,7 +350,8 @@ protected Job createJob(Configuration conf) { // create single job Job newJob = new JobImpl(appAttemptID, conf, dispatcher.getEventHandler(), taskAttemptListener, jobTokenSecretManager, fsTokens, clock, - completedTasksFromPreviousRun, metrics, currentUser.getUserName()); + completedTasksFromPreviousRun, metrics, currentUser.getUserName(), + appSubmitTime, amInfos); ((RunningAppContext) context).jobs.put(newJob.getID(), newJob); dispatcher.register(JobFinishEvent.Type.class, @@ -463,6 +487,10 @@ public Set getCompletedTaskFromPreviousRun() { return completedTasksFromPreviousRun; } + public List getAllAMInfos() { + return amInfos; + } + public ContainerAllocator getContainerAllocator() { return containerAllocator; } @@ -617,11 +645,33 @@ public Clock getClock() { @Override public void start() { - ///////////////////// Create the job itself. + // Pull completedTasks etc from recovery + if (inRecovery) { + completedTasksFromPreviousRun = recoveryServ.getCompletedTasks(); + amInfos = recoveryServ.getAMInfos(); + } + + // / Create the AMInfo for the current AppMaster + if (amInfos == null) { + amInfos = new LinkedList(); + } + AMInfo amInfo = + new AMInfo(appAttemptID, startTime, containerID, nmHost, nmHttpPort); + amInfos.add(amInfo); + + // /////////////////// Create the job itself. job = createJob(getConfig()); - + // End of creating the job. + // Send out an MR AM inited event for this AM and all previous AMs. + for (AMInfo info : amInfos) { + dispatcher.getEventHandler().handle( + new JobHistoryEvent(job.getID(), new AMStartedEvent(info + .getAppAttemptId(), info.getStartTime(), info.getContainerId(), + info.getNodeManagerHost(), info.getNodeManagerHttpPort()))); + } + // metrics system init is really init & start. // It's more test friendly to put it here. DefaultMetricsSystem.initialize("MRAppMaster"); @@ -723,17 +773,39 @@ public void handle(SpeculatorEvent event) { public static void main(String[] args) { try { - String applicationAttemptIdStr = System - .getenv(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV); - if (applicationAttemptIdStr == null) { - String msg = ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV - + " is null"; + String containerIdStr = + System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV); + String nodeHttpAddressStr = + System.getenv(ApplicationConstants.NM_HTTP_ADDRESS_ENV); + String appSubmitTimeStr = + System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV); + if (containerIdStr == null) { + String msg = ApplicationConstants.AM_CONTAINER_ID_ENV + " is null"; LOG.error(msg); throw new IOException(msg); } - ApplicationAttemptId applicationAttemptId = ConverterUtils - .toApplicationAttemptId(applicationAttemptIdStr); - MRAppMaster appMaster = new MRAppMaster(applicationAttemptId); + if (nodeHttpAddressStr == null) { + String msg = ApplicationConstants.NM_HTTP_ADDRESS_ENV + " is null"; + LOG.error(msg); + throw new IOException(msg); + } + if (appSubmitTimeStr == null) { + String msg = ApplicationConstants.APP_SUBMIT_TIME_ENV + " is null"; + LOG.error(msg); + throw new IOException(msg); + } + + ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); + ApplicationAttemptId applicationAttemptId = + containerId.getApplicationAttemptId(); + InetSocketAddress nodeHttpInetAddr = + NetUtils.createSocketAddr(nodeHttpAddressStr); + long appSubmitTime = Long.parseLong(appSubmitTimeStr); + + MRAppMaster appMaster = + new MRAppMaster(applicationAttemptId, containerId, + nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(), + appSubmitTime); Runtime.getRuntime().addShutdownHook( new CompositeServiceShutdownHook(appMaster)); YarnConfiguration conf = new YarnConfiguration(new JobConf()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java index 658f2cb877d..cf586743ec8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobACL; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; @@ -68,5 +69,10 @@ public interface Job { TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(int fromEventId, int maxEvents); + /** + * @return information for MR AppMasters (previously failed and current) + */ + List getAMInfos(); + boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index bf5443025f2..565253fe044 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent; import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; @@ -136,6 +137,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private final String username; private final Map jobACLs; private final Set completedTasksFromPreviousRun; + private final List amInfos; private final Lock readLock; private final Lock writeLock; private final JobId jobId; @@ -148,6 +150,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private final EventHandler eventHandler; private final MRAppMetrics metrics; private final String userName; + private final long appSubmitTime; private boolean lazyTasksCopyNeeded = false; private volatile Map tasks = new LinkedHashMap(); @@ -354,7 +357,6 @@ JobEventType.JOB_KILL, new KillTasksTransition()) private int failedReduceTaskCount = 0; private int killedMapTaskCount = 0; private int killedReduceTaskCount = 0; - private long submitTime; private long startTime; private long finishTime; private float setupProgress; @@ -370,7 +372,7 @@ public JobImpl(ApplicationAttemptId applicationAttemptId, Configuration conf, JobTokenSecretManager jobTokenSecretManager, Credentials fsTokenCredentials, Clock clock, Set completedTasksFromPreviousRun, MRAppMetrics metrics, - String userName) { + String userName, long appSubmitTime, List amInfos) { this.applicationAttemptId = applicationAttemptId; this.jobId = recordFactory.newRecordInstance(JobId.class); this.jobName = conf.get(JobContext.JOB_NAME, ""); @@ -378,7 +380,9 @@ public JobImpl(ApplicationAttemptId applicationAttemptId, Configuration conf, this.metrics = metrics; this.clock = clock; this.completedTasksFromPreviousRun = completedTasksFromPreviousRun; + this.amInfos = amInfos; this.userName = userName; + this.appSubmitTime = appSubmitTime; ApplicationId applicationId = applicationAttemptId.getApplicationId(); jobId.setAppId(applicationId); jobId.setId(applicationId.getId()); @@ -806,6 +810,11 @@ public int getTotalReduces() { public Map getJobACLs() { return Collections.unmodifiableMap(jobACLs); } + + @Override + public List getAMInfos() { + return amInfos; + } public static class InitTransition implements MultipleArcTransition { @@ -819,7 +828,6 @@ public static class InitTransition */ @Override public JobState transition(JobImpl job, JobEvent event) { - job.submitTime = job.clock.getTime(); job.metrics.submittedJob(job); job.metrics.preparingJob(job); try { @@ -830,7 +838,7 @@ public JobState transition(JobImpl job, JobEvent event) { JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId, job.conf.get(MRJobConfig.JOB_NAME, "test"), job.conf.get(MRJobConfig.USER_NAME, "mapred"), - job.submitTime, + job.appSubmitTime, job.remoteJobConfFile.toString(), job.jobACLs, job.conf.get(MRJobConfig.QUEUE_NAME, "default")); job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse)); @@ -1152,7 +1160,7 @@ public void transition(JobImpl job, JobEvent event) { job.isUber()); //Will transition to state running. Currently in INITED job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie)); JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId, - job.submitTime, job.startTime); + job.appSubmitTime, job.startTime); job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice)); job.metrics.runningJob(job); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index ee659e6a5ee..f01bfa5b666 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -894,15 +894,20 @@ private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed( return jce; } - private static TaskAttemptUnsuccessfulCompletionEvent createTaskAttemptUnsuccessfulCompletionEvent( - TaskAttemptImpl taskAttempt, TaskAttemptState attemptState) { - TaskAttemptUnsuccessfulCompletionEvent tauce = new TaskAttemptUnsuccessfulCompletionEvent( - TypeConverter.fromYarn(taskAttempt.attemptId), - TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()), - attemptState.toString(), taskAttempt.finishTime, - taskAttempt.nodeHostName == null ? "UNKNOWN" : taskAttempt.nodeHostName, - StringUtils.join(LINE_SEPARATOR, taskAttempt.getDiagnostics()), - taskAttempt.getProgressSplitBlock().burst()); + private static + TaskAttemptUnsuccessfulCompletionEvent + createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt, + TaskAttemptState attemptState) { + TaskAttemptUnsuccessfulCompletionEvent tauce = + new TaskAttemptUnsuccessfulCompletionEvent( + TypeConverter.fromYarn(taskAttempt.attemptId), + TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId() + .getTaskType()), attemptState.toString(), + taskAttempt.finishTime, + taskAttempt.containerMgrAddress == null ? "UNKNOWN" + : taskAttempt.containerMgrAddress, StringUtils.join( + LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt + .getProgressSplitBlock().burst()); return tauce; } @@ -1120,11 +1125,15 @@ public void transition(TaskAttemptImpl taskAttempt, , 1); taskAttempt.eventHandler.handle(jce); + LOG.info("TaskAttempt: [" + taskAttempt.attemptId + + "] using containerId: [" + taskAttempt.containerID + " on NM: [" + + taskAttempt.containerMgrAddress + "]"); TaskAttemptStartedEvent tase = new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId), TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()), taskAttempt.launchTime, - nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(), taskAttempt.shufflePort); + nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(), + taskAttempt.shufflePort, taskAttempt.containerID); taskAttempt.eventHandler.handle (new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase)); taskAttempt.eventHandler.handle @@ -1236,7 +1245,8 @@ private void logAttemptFinishedEvent(TaskAttemptState state) { TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), state.toString(), this.reportedStatus.mapFinishTime, - finishTime, this.nodeHostName == null ? "UNKNOWN" : this.nodeHostName, + finishTime, this.containerMgrAddress == null ? "UNKNOWN" + : this.containerMgrAddress, this.reportedStatus.stateString, TypeConverter.fromYarn(getCounters()), getProgressSplitBlock().burst()); @@ -1249,7 +1259,8 @@ private void logAttemptFinishedEvent(TaskAttemptState state) { state.toString(), this.reportedStatus.shuffleFinishTime, this.reportedStatus.sortFinishTime, - finishTime, this.nodeHostName == null ? "UNKNOWN" : this.nodeHostName, + finishTime, this.containerMgrAddress == null ? "UNKNOWN" + : this.containerMgrAddress, this.reportedStatus.stateString, TypeConverter.fromYarn(getCounters()), getProgressSplitBlock().burst()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java index 8005714389a..18aa0cbda2c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java @@ -18,8 +18,10 @@ package org.apache.hadoop.mapreduce.v2.app.recover; +import java.util.List; import java.util.Set; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.event.Dispatcher; @@ -31,4 +33,6 @@ public interface Recovery { Clock getClock(); Set getCompletedTasks(); + + List getAMInfos(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java index b6cd5f1a909..4f2c8862aa0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,6 +35,7 @@ import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.v2.api.records.Phase; @@ -148,6 +150,14 @@ public Clock getClock() { public Set getCompletedTasks() { return completedTasks.keySet(); } + + @Override + public List getAMInfos() { + if (jobInfo == null || jobInfo.getAMInfos() == null) { + return new LinkedList(); + } + return new LinkedList(jobInfo.getAMInfos()); + } private void parse() throws IOException { // TODO: parse history file based on startCount @@ -351,15 +361,16 @@ private void sendStatusUpdateEvent(TaskAttemptId yarnAttemptID, private void sendAssignedEvent(TaskAttemptId yarnAttemptID, TaskAttemptInfo attemptInfo) { LOG.info("Sending assigned event to " + yarnAttemptID); - ContainerId cId = recordFactory - .newRecordInstance(ContainerId.class); + ContainerId cId = attemptInfo.getContainerId(); Container container = recordFactory .newRecordInstance(Container.class); container.setId(cId); container.setNodeId(recordFactory .newRecordInstance(NodeId.class)); + // NodeId can be obtained from TaskAttemptInfo.hostname - but this will + // eventually contain rack info. container.setContainerToken(null); - container.setNodeHttpAddress(attemptInfo.getHostname() + ":" + + container.setNodeHttpAddress(attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort()); actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID, container)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index d6e2d968173..5c451e5d65e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.util.BuilderUtils; /** @@ -118,10 +119,20 @@ private static ApplicationAttemptId getApplicationAttemptId( applicationAttemptId.setAttemptId(startCount); return applicationAttemptId; } + + private static ContainerId getContainerId(ApplicationId applicationId, + int startCount) { + ApplicationAttemptId appAttemptId = + getApplicationAttemptId(applicationId, startCount); + ContainerId containerId = + BuilderUtils.newContainerId(appAttemptId, startCount); + return containerId; + } public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount) { - super(getApplicationAttemptId(applicationId, startCount)); + super(getApplicationAttemptId(applicationId, startCount), getContainerId( + applicationId, startCount), "testhost", 3333, System.currentTimeMillis()); this.testWorkDir = new File("target", testName); testAbsPath = new Path(testWorkDir.getAbsolutePath()); LOG.info("PathUsed: " + testAbsPath); @@ -405,10 +416,10 @@ protected StateMachine getStateMachine() { public TestJob(Configuration conf, ApplicationId applicationId, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Clock clock, String user) { - super(getApplicationAttemptId(applicationId, getStartCount()), - conf, eventHandler, taskAttemptListener, - new JobTokenSecretManager(), new Credentials(), clock, - getCompletedTaskFromPreviousRun(), metrics, user); + super(getApplicationAttemptId(applicationId, getStartCount()), conf, + eventHandler, taskAttemptListener, new JobTokenSecretManager(), + new Credentials(), clock, getCompletedTaskFromPreviousRun(), metrics, + user, System.currentTimeMillis(), getAllAMInfos()); // This "this leak" is okay because the retained pointer is in an // instance variable. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java index 7f55dd4d571..2e9bf45221c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java @@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.TaskCounter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; @@ -488,6 +489,11 @@ public Path getConfFile() { public Map getJobACLs() { return Collections.emptyMap(); } + + @Override + public List getAMInfos() { + throw new UnsupportedOperationException("Not supported yet."); + } }; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java index c21c4528fb8..ac3edebf8d7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; @@ -36,11 +37,15 @@ public class TestMRAppMaster { public void testMRAppMasterForDifferentUser() throws IOException, InterruptedException { String applicationAttemptIdStr = "appattempt_1317529182569_0004_000001"; + String containerIdStr = "container_1317529182569_0004_000001_1"; String stagingDir = "/tmp/staging"; String userName = "TestAppMasterUser"; ApplicationAttemptId applicationAttemptId = ConverterUtils .toApplicationAttemptId(applicationAttemptIdStr); - MRAppMasterTest appMaster = new MRAppMasterTest(applicationAttemptId); + ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); + MRAppMasterTest appMaster = + new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, + System.currentTimeMillis()); YarnConfiguration conf = new YarnConfiguration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -54,8 +59,9 @@ class MRAppMasterTest extends MRAppMaster { Path stagingDirPath; private Configuration conf; - public MRAppMasterTest(ApplicationAttemptId applicationAttemptId) { - super(applicationAttemptId); + public MRAppMasterTest(ApplicationAttemptId applicationAttemptId, + ContainerId containerId, String host, int port, long submitTime) { + super(applicationAttemptId, containerId, host, port, submitTime); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index 53e94db42b0..4c09a83ae74 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -340,8 +340,8 @@ private static class FakeJob extends JobImpl { public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf, int numMaps, int numReduces) { - super(appAttemptID, conf, null, null, null, null, null, null, null, - null); + super(appAttemptID, conf, null, null, null, null, null, null, null, null, + System.currentTimeMillis(), null); this.jobId = MRBuilderUtils .newJobId(appAttemptID.getApplicationId(), 0); this.numMaps = numMaps; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 75db751480e..34684a1d72c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; @@ -46,6 +47,7 @@ public class TestRecovery { @Test public void testCrashed() throws Exception { int runCount = 0; + long am1StartTimeEst = System.currentTimeMillis(); MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount); Configuration conf = new Configuration(); conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); @@ -126,9 +128,10 @@ public void testCrashed() throws Exception { //stop the app app.stop(); - + //rerun //in rerun the 1st map will be recovered from previous run + long am2StartTimeEst = System.currentTimeMillis(); app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount); conf = new Configuration(); conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); @@ -178,8 +181,27 @@ public void testCrashed() throws Exception { task1StartTime, mapTask1.getReport().getStartTime()); Assert.assertEquals("Task Finish time not correct", task1FinishTime, mapTask1.getReport().getFinishTime()); + Assert.assertEquals(2, job.getAMInfos().size()); + int attemptNum = 1; + // Verify AMInfo + for (AMInfo amInfo : job.getAMInfos()) { + Assert.assertEquals(attemptNum++, amInfo.getAppAttemptId() + .getAttemptId()); + Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId() + .getApplicationAttemptId()); + Assert.assertEquals("testhost", amInfo.getNodeManagerHost()); + Assert.assertEquals(3333, amInfo.getNodeManagerHttpPort()); + } + long am1StartTimeReal = job.getAMInfos().get(0).getStartTime(); + long am2StartTimeReal = job.getAMInfos().get(1).getStartTime(); + Assert.assertTrue(am1StartTimeReal >= am1StartTimeEst + && am1StartTimeReal <= am2StartTimeEst); + Assert.assertTrue(am2StartTimeReal >= am2StartTimeEst + && am2StartTimeReal <= System.currentTimeMillis()); + // TODO Add verification of additional data from jobHistory - whatever was + // available in the failed attempt should be available here } - + class MRAppWithHistory extends MRApp { public MRAppWithHistory(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index ca1aa14ec5f..2081e4d8600 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobACL; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; @@ -473,6 +474,11 @@ public Path getConfFile() { public Map getJobACLs() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public List getAMInfos() { + throw new UnsupportedOperationException("Not supported yet."); + } } /* diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java index 037e8baed5b..4ba7be918f7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; @@ -81,7 +82,9 @@ public void testDeletionofStaging() throws IOException { private class TestMRApp extends MRAppMaster { public TestMRApp(ApplicationAttemptId applicationAttemptId) { - super(applicationAttemptId); + super(applicationAttemptId, BuilderUtils.newContainerId( + applicationAttemptId, 1), "testhost", 3333, System + .currentTimeMillis()); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr index 89844ebd5ca..e05e0c900e3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr @@ -69,6 +69,16 @@ ] }, + {"type": "record", "name": "AMStarted", + "fields": [ + {"name": "applicationAttemptId", "type": "string"}, + {"name": "startTime", "type": "long"}, + {"name": "containerId", "type": "string"}, + {"name": "nodeManagerHost", "type": "string"}, + {"name": "nodeManagerHttpPort", "type": "int"} + ] + }, + {"type": "record", "name": "JobSubmitted", "fields": [ {"name": "jobid", "type": "string"}, @@ -174,7 +184,8 @@ {"name": "startTime", "type": "long"}, {"name": "trackerName", "type": "string"}, {"name": "httpPort", "type": "int"}, - {"name": "shufflePort", "type": "int"} + {"name": "shufflePort", "type": "int"}, + {"name": "containerId", "type": "string"} ] }, @@ -260,7 +271,8 @@ "CLEANUP_ATTEMPT_STARTED", "CLEANUP_ATTEMPT_FINISHED", "CLEANUP_ATTEMPT_FAILED", - "CLEANUP_ATTEMPT_KILLED" + "CLEANUP_ATTEMPT_KILLED", + "AM_STARTED" ] }, @@ -272,6 +284,7 @@ "JobFinished", "JobInfoChange", "JobInited", + "AMStarted", "JobPriorityChange", "JobStatusChanged", "JobSubmitted", diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java new file mode 100644 index 00000000000..04e8792e889 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.jobhistory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.util.ConverterUtils; + +import org.apache.avro.util.Utf8; + +/** + * Event to record start of a task attempt + * + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class AMStartedEvent implements HistoryEvent { + private AMStarted datum = new AMStarted(); + + /** + * Create an event to record the start of an MR AppMaster + * + * @param appAttemptId + * the application attempt id. + * @param startTime + * the start time of the AM. + * @param containerId + * the containerId of the AM. + * @param nodeManagerHost + * the node on which the AM is running. + * @param nodeManagerHttpPort + * the httpPort for the node running the AM. + */ + public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime, + ContainerId containerId, String nodeManagerHost, int nodeManagerHttpPort) { + datum.applicationAttemptId = new Utf8(appAttemptId.toString()); + datum.startTime = startTime; + datum.containerId = new Utf8(containerId.toString()); + datum.nodeManagerHost = new Utf8(nodeManagerHost); + datum.nodeManagerHttpPort = nodeManagerHttpPort; + } + + AMStartedEvent() { + } + + public Object getDatum() { + return datum; + } + + public void setDatum(Object datum) { + this.datum = (AMStarted) datum; + } + + /** + * @return the ApplicationAttemptId + */ + public ApplicationAttemptId getAppAttemptId() { + return ConverterUtils.toApplicationAttemptId(datum.applicationAttemptId + .toString()); + } + + /** + * @return the start time for the MRAppMaster + */ + public long getStartTime() { + return datum.startTime; + } + + /** + * @return the ContainerId for the MRAppMaster. + */ + public ContainerId getContainerId() { + return ConverterUtils.toContainerId(datum.containerId.toString()); + } + + /** + * @return the node manager host. + */ + public String getNodeManagerHost() { + return datum.nodeManagerHost.toString(); + } + + /** + * @return the http port for the tracker. + */ + public int getNodeManagerHttpPort() { + return datum.nodeManagerHttpPort; + } + + /** Get the attempt id */ + + @Override + public EventType getEventType() { + return EventType.AM_STARTED; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java index 6f86516ee7a..5d74b802189 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java @@ -33,7 +33,6 @@ import org.apache.avro.Schema; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.JsonDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.specific.SpecificDatumReader; @@ -146,8 +145,10 @@ public HistoryEvent getNextEvent() throws IOException { result = new TaskAttemptUnsuccessfulCompletionEvent(); break; case CLEANUP_ATTEMPT_KILLED: result = new TaskAttemptUnsuccessfulCompletionEvent(); break; + case AM_STARTED: + result = new AMStartedEvent(); break; default: - throw new RuntimeException("unexpected event type!"); + throw new RuntimeException("unexpected event type: " + wrapper.type); } result.setDatum(wrapper.event); return result; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java index 81294d5b659..e41dcc34b12 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; @@ -37,6 +39,8 @@ import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; /** * Default Parser for the JobHistory files. Typical usage is @@ -174,6 +178,9 @@ private void handleEvent(HistoryEvent event) throws IOException { case CLEANUP_ATTEMPT_FINISHED: handleTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event); break; + case AM_STARTED: + handleAMStartedEvent((AMStartedEvent) event); + break; default: break; } @@ -241,6 +248,7 @@ private void handleTaskAttemptStartedEvent(TaskAttemptStartedEvent event) { attemptInfo.trackerName = event.getTrackerName(); attemptInfo.taskType = event.getTaskType(); attemptInfo.shufflePort = event.getShufflePort(); + attemptInfo.containerId = event.getContainerId(); taskInfo.attemptsMap.put(attemptId, attemptInfo); } @@ -305,6 +313,20 @@ private void handleJobInitedEvent(JobInitedEvent event) { info.totalReduces = event.getTotalReduces(); info.uberized = event.getUberized(); } + + private void handleAMStartedEvent(AMStartedEvent event) { + AMInfo amInfo = new AMInfo(); + amInfo.appAttemptId = event.getAppAttemptId(); + amInfo.startTime = event.getStartTime(); + amInfo.containerId = event.getContainerId(); + amInfo.nodeManagerHost = event.getNodeManagerHost(); + amInfo.nodeManagerHttpPort = event.getNodeManagerHttpPort(); + if (info.amInfos == null) { + info.amInfos = new LinkedList(); + } + info.amInfos.add(amInfo); + info.latestAmInfo = amInfo; + } private void handleJobInfoChangeEvent(JobInfoChangeEvent event) { info.submitTime = event.getSubmitTime(); @@ -348,6 +370,8 @@ public static class JobInfo { Map jobACLs; Map tasksMap; + List amInfos; + AMInfo latestAmInfo; boolean uberized; /** Create a job info object where job information will be stored @@ -377,7 +401,9 @@ public void printAll() { System.out.println("REDUCE_COUNTERS:" + reduceCounters.toString()); System.out.println("TOTAL_COUNTERS: " + totalCounters.toString()); System.out.println("UBERIZED: " + uberized); - + for (AMInfo amInfo : amInfos) { + amInfo.printAll(); + } for (TaskInfo ti: tasksMap.values()) { ti.printAll(); } @@ -427,6 +453,10 @@ public void printAll() { public Map getJobACLs() { return jobACLs; } /** @return the uberized status of this job */ public boolean getUberized() { return uberized; } + /** @return the AMInfo for the job's AppMaster */ + public List getAMInfos() { return amInfos; } + /** @return the AMInfo for the newest AppMaster */ + public AMInfo getLatestAMInfo() { return latestAmInfo; }; } /** @@ -509,6 +539,7 @@ public static class TaskAttemptInfo { int httpPort; int shufflePort; String hostname; + ContainerId containerId; /** Create a Task Attempt Info which will store attempt level information * on a history parse. @@ -534,6 +565,7 @@ public void printAll() { System.out.println("TRACKER_NAME:" + trackerName); System.out.println("HTTP_PORT:" + httpPort); System.out.println("SHUFFLE_PORT:" + shufflePort); + System.out.println("CONTIANER_ID:" + containerId); if (counters != null) { System.out.println("COUNTERS:" + counters.toString()); } @@ -569,5 +601,74 @@ public void printAll() { public int getHttpPort() { return httpPort; } /** @return the Shuffle port for the tracker */ public int getShufflePort() { return shufflePort; } + /** @return the ContainerId for the tracker */ + public ContainerId getContainerId() { return containerId; } } + + /** + * Stores AM information + */ + public static class AMInfo { + ApplicationAttemptId appAttemptId; + long startTime; + ContainerId containerId; + String nodeManagerHost; + int nodeManagerHttpPort; + + /** + * Create a AM Info which will store AM level information on a history + * parse. + */ + public AMInfo() { + startTime = -1; + nodeManagerHost = ""; + nodeManagerHttpPort = -1; + } + + public AMInfo(ApplicationAttemptId appAttemptId, long startTime, + ContainerId containerId, String nodeManagerHost, int nodeManagerHttpPort) { + this.appAttemptId = appAttemptId; + this.startTime = startTime; + this.containerId = containerId; + this.nodeManagerHost = nodeManagerHost; + this.nodeManagerHttpPort = nodeManagerHttpPort; + } + + /** + * Print all the information about this AM. + */ + public void printAll() { + System.out.println("APPLICATION_ATTEMPT_ID:" + appAttemptId.toString()); + System.out.println("START_TIME: " + startTime); + System.out.println("CONTAINER_ID: " + containerId.toString()); + System.out.println("NODE_MANAGER_HOST: " + nodeManagerHost); + System.out.println("NODE_MANAGER_HTTP_PORT: " + nodeManagerHttpPort); + } + + /** @return the ApplicationAttemptId */ + public ApplicationAttemptId getAppAttemptId() { + return appAttemptId; + } + + /** @return the start time of the AM */ + public long getStartTime() { + return startTime; + } + + /** @return the container id for the AM */ + public ContainerId getContainerId() { + return containerId; + } + + /** @return the host name for the node manager on which the AM is running */ + public String getNodeManagerHost() { + return nodeManagerHost; + } + + /** @return the http port for the node manager running the AM */ + public int getNodeManagerHttpPort() { + return nodeManagerHttpPort; + } + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java index 099941ec1ff..ed3ba1cbf5f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java @@ -18,8 +18,6 @@ package org.apache.hadoop.mapreduce.jobhistory; -import java.io.IOException; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobID; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java index af61487ed80..95d28b5c056 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java @@ -18,13 +18,13 @@ package org.apache.hadoop.mapreduce.jobhistory; -import java.io.IOException; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.avro.util.Utf8; @@ -45,10 +45,11 @@ public class TaskAttemptStartedEvent implements HistoryEvent { * @param trackerName Name of the Task Tracker where attempt is running * @param httpPort The port number of the tracker * @param shufflePort The shuffle port number of the container + * @param containerId The containerId for the task attempt. */ public TaskAttemptStartedEvent( TaskAttemptID attemptId, TaskType taskType, long startTime, String trackerName, - int httpPort, int shufflePort) { + int httpPort, int shufflePort, ContainerId containerId) { datum.attemptId = new Utf8(attemptId.toString()); datum.taskid = new Utf8(attemptId.getTaskID().toString()); datum.startTime = startTime; @@ -56,6 +57,15 @@ public TaskAttemptStartedEvent( TaskAttemptID attemptId, datum.trackerName = new Utf8(trackerName); datum.httpPort = httpPort; datum.shufflePort = shufflePort; + datum.containerId = new Utf8(containerId.toString()); + } + + // TODO Remove after MrV1 is removed. + // Using a dummy containerId to prevent jobHistory parse failures. + public TaskAttemptStartedEvent(TaskAttemptID attemptId, TaskType taskType, + long startTime, String trackerName, int httpPort, int shufflePort) { + this(attemptId, taskType, startTime, trackerName, httpPort, shufflePort, + ConverterUtils.toContainerId("container_-1_-1_-1_-1")); } TaskAttemptStartedEvent() {} @@ -91,5 +101,8 @@ public EventType getEventType() { ? EventType.MAP_ATTEMPT_STARTED : EventType.REDUCE_ATTEMPT_STARTED; } - + /** Get the ContainerId */ + public ContainerId getContainerId() { + return ConverterUtils.toContainerId(datum.containerId.toString()); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java index 8f7ebd56e75..ca89080570e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java @@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; @@ -337,4 +338,9 @@ public String getUserName() { public Path getConfFile() { return confFile; } + + @Override + public List getAMInfos() { + return jobInfo.getAMInfos(); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java index 30dadf2706c..5481ed35ad6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java @@ -29,8 +29,6 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -85,35 +83,17 @@ public class CompletedTaskAttempt implements TaskAttempt { @Override public ContainerId getAssignedContainerID() { - //TODO ContainerId needs to be part of some historyEvent to be able to - //render the log directory. - ContainerId containerId = - RecordFactoryProvider.getRecordFactory(null).newRecordInstance( - ContainerId.class); - containerId.setId(-1); - ApplicationAttemptId applicationAttemptId = - RecordFactoryProvider.getRecordFactory(null).newRecordInstance( - ApplicationAttemptId.class); - applicationAttemptId.setAttemptId(-1); - ApplicationId applicationId = - RecordFactoryProvider.getRecordFactory(null).newRecordInstance( - ApplicationId.class); - applicationId.setClusterTimestamp(-1); - applicationId.setId(-1); - applicationAttemptId.setApplicationId(applicationId); - containerId.setApplicationAttemptId(applicationAttemptId); - return containerId; + return attemptInfo.getContainerId(); } @Override public String getAssignedContainerMgrAddress() { - // TODO Verify this is correct. - return attemptInfo.getTrackerName(); + return attemptInfo.getHostname(); } @Override public String getNodeHttpAddress() { - return attemptInfo.getHostname() + ":" + attemptInfo.getHttpPort(); + return attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort(); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java index e84bfa8089d..efc0c2bab9e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobACL; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; @@ -159,4 +160,9 @@ public Map getJobACLs() { throw new IllegalStateException("Not implemented yet"); } + @Override + public List getAMInfos() { + return null; + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java index f94714e133d..3a31535b9ad 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java @@ -41,8 +41,10 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.service.Service; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; public class TestJobHistoryEvents { @@ -159,6 +161,10 @@ private void verifyTask(Task task) { private void verifyAttempt(TaskAttempt attempt) { Assert.assertEquals("TaskAttempt state not currect", TaskAttemptState.SUCCEEDED, attempt.getState()); + Assert.assertNotNull(attempt.getAssignedContainerID()); + //Verify the wrong ctor is not being used. Remove after mrv1 is removed. + ContainerId fakeCid = BuilderUtils.newContainerId(-1, -1, -1, -1); + Assert.assertFalse(attempt.getAssignedContainerID().equals(fakeCid)); } static class MRAppWithHistory extends MRApp { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java index da64718099c..cf4b51b5655 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java @@ -34,8 +34,9 @@ import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.MRApp; @@ -46,7 +47,9 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.service.Service; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; public class TestJobHistoryParsing { @@ -54,6 +57,7 @@ public class TestJobHistoryParsing { @Test public void testHistoryParsing() throws Exception { Configuration conf = new Configuration(); + long amStartTimeEst = System.currentTimeMillis(); MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), true); app.submit(conf); Job job = app.getContext().getAllJobs().values().iterator().next(); @@ -102,12 +106,30 @@ public void testHistoryParsing() throws Exception { job.isUber(), jobInfo.getUberized()); int totalTasks = jobInfo.getAllTasks().size(); Assert.assertEquals("total number of tasks is incorrect ", 3, totalTasks); + + // Verify aminfo + Assert.assertEquals(1, jobInfo.getAMInfos().size()); + Assert.assertEquals("testhost", jobInfo.getAMInfos().get(0) + .getNodeManagerHost()); + AMInfo amInfo = jobInfo.getAMInfos().get(0); + Assert.assertEquals(3333, amInfo.getNodeManagerHttpPort()); + Assert.assertEquals(1, amInfo.getAppAttemptId().getAttemptId()); + Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId() + .getApplicationAttemptId()); + Assert.assertTrue(amInfo.getStartTime() <= System.currentTimeMillis() + && amInfo.getStartTime() >= amStartTimeEst); + ContainerId fakeCid = BuilderUtils.newContainerId(-1, -1, -1, -1); //Assert at taskAttempt level for (TaskInfo taskInfo : jobInfo.getAllTasks().values()) { int taskAttemptCount = taskInfo.getAllTaskAttempts().size(); Assert.assertEquals("total number of task attempts ", 1, taskAttemptCount); + TaskAttemptInfo taInfo = + taskInfo.getAllTaskAttempts().values().iterator().next(); + Assert.assertNotNull(taInfo.getContainerId()); + //Verify the wrong ctor is not being used. Remove after mrv1 is removed. + Assert.assertFalse(taInfo.getContainerId().equals(fakeCid)); } // Deep compare Job and JobInfo diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java index 18bdce7286d..028e90de112 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java @@ -36,12 +36,24 @@ public interface ApplicationConstants { // TODO: They say tokens via env isn't good. public static final String APPLICATION_CLIENT_SECRET_ENV_NAME = "AppClientTokenEnv"; - + /** - * The environmental variable for APPLICATION_ATTEMPT_ID. Set in - * ApplicationMaster's environment only. + * The environment variable for CONTAINER_ID. Set in AppMaster environment + * only */ - public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID"; + public static final String AM_CONTAINER_ID_ENV = "AM_CONTAINER_ID"; + + /** + * The environment variable for NM_HTTP_ADDRESS. Set in AppMaster environment + * only + */ + public static final String NM_HTTP_ADDRESS_ENV = "NM_HTTP_ADDRESS"; + + /** + * The environment variable for APP_SUBMIT_TIME. Set in AppMaster environment + * only + */ + public static final String APP_SUBMIT_TIME_ENV = "APP_SUBMIT_TIME_ENV"; public static final String CONTAINER_TOKEN_FILE_ENV_NAME = UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 0f1243fd9fb..a641adee932 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -126,7 +126,7 @@ public interface ApplicationSubmissionContext { @Public @Stable public void setUser(String user); - + /** * Get the ContainerLaunchContext to describe the * Container with which the ApplicationMaster is diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 19800ba91a3..0beb2001e74 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -287,7 +287,7 @@ public boolean init(String[] args) throws ParseException, IOException { Map envs = System.getenv(); appAttemptID = Records.newRecord(ApplicationAttemptId.class); - if (!envs.containsKey(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV)) { + if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) { if (cliParser.hasOption("app_attempt_id")) { String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr); @@ -296,7 +296,8 @@ public boolean init(String[] args) throws ParseException, IOException { throw new IllegalArgumentException("Application Attempt Id not set in the environment"); } } else { - appAttemptID = ConverterUtils.toApplicationAttemptId(envs.get(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV)); + ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)); + appAttemptID = containerId.getApplicationAttemptId(); } LOG.info("Application master for app" diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java index 296c9d75a12..bfa6a09d6e8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java @@ -20,10 +20,8 @@ import static org.apache.hadoop.yarn.util.StringHelper._split; -import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.text.NumberFormat; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -33,9 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.URL; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -97,27 +93,8 @@ public static URL getYarnUrlFromURI(URI uri) { return url; } - // TODO: Why thread local? - // ^ NumberFormat instances are not threadsafe - private static final ThreadLocal appIdFormat = - new ThreadLocal() { - @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(4); - return fmt; - } - }; - - - public static String toString(ApplicationId appId) { - StringBuilder sb = new StringBuilder(); - sb.append(APPLICATION_PREFIX + "_").append(appId.getClusterTimestamp()) - .append("_"); - sb.append(appIdFormat.get().format(appId.getId())); - return sb.toString(); + return appId.toString(); } public static ApplicationId toApplicationId(RecordFactory recordFactory, @@ -152,11 +129,11 @@ public static String toString(ContainerId cId) { return cId.toString(); } - public static ContainerId toContainerId(String containerIdStr) - throws IOException { + public static ContainerId toContainerId(String containerIdStr) { Iterator it = _split(containerIdStr).iterator(); if (!it.next().equals(CONTAINER_PREFIX)) { - throw new IOException("Invalid ContainerId prefix: " + containerIdStr); + throw new IllegalArgumentException("Invalid ContainerId prefix: " + + containerIdStr); } try { ApplicationAttemptId appAttemptID = toApplicationAttemptId(it); @@ -165,21 +142,22 @@ public static ContainerId toContainerId(String containerIdStr) containerId.setId(Integer.parseInt(it.next())); return containerId; } catch (NumberFormatException n) { - throw new IOException("Invalid ContainerId: " + containerIdStr, n); + throw new IllegalArgumentException("Invalid ContainerId: " + + containerIdStr, n); } } public static ApplicationAttemptId toApplicationAttemptId( - String applicationAttmeptIdStr) throws IOException { + String applicationAttmeptIdStr) { Iterator it = _split(applicationAttmeptIdStr).iterator(); if (!it.next().equals(APPLICATION_ATTEMPT_PREFIX)) { - throw new IOException("Invalid AppAttemptId prefix: " + throw new IllegalArgumentException("Invalid AppAttemptId prefix: " + applicationAttmeptIdStr); } try { return toApplicationAttemptId(it); } catch (NumberFormatException n) { - throw new IOException("Invalid AppAttemptId: " + throw new IllegalArgumentException("Invalid AppAttemptId: " + applicationAttmeptIdStr, n); } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java index e0795613b65..5bbf4474a9c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java @@ -69,7 +69,7 @@ protected void render(Block html) { ContainerId containerId; try { containerId = ConverterUtils.toContainerId($(CONTAINER_ID)); - } catch (IOException e) { + } catch (IllegalArgumentException e) { div.h1("Invalid containerId " + $(CONTAINER_ID))._(); return; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java index de76b84e277..358159bf889 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java @@ -22,8 +22,6 @@ import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID; -import java.io.IOException; - import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -66,7 +64,7 @@ protected void render(Block html) { ContainerId containerID; try { containerID = ConverterUtils.toContainerId($(CONTAINER_ID)); - } catch (IOException e) { + } catch (IllegalArgumentException e) { html.p()._("Invalid containerId " + $(CONTAINER_ID))._(); return; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 01eab2111be..f27dd7007f9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -206,11 +206,12 @@ public SubmitApplicationResponse submitApplication( // Safety submissionContext.setUser(user); - + // This needs to be synchronous as the client can query // immediately following the submission to get the application status. // So call handle directly and do not send an event. - rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext)); + rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System + .currentTimeMillis())); LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user + " with " + submissionContext); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 3f175a34a0a..597d4c6a71f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -213,7 +213,7 @@ protected synchronized void checkAppNumCompletedLimit() { @SuppressWarnings("unchecked") protected synchronized void submitApplication( - ApplicationSubmissionContext submissionContext) { + ApplicationSubmissionContext submissionContext, long submitTime) { ApplicationId applicationId = submissionContext.getApplicationId(); RMApp application = null; try { @@ -241,13 +241,13 @@ protected synchronized void submitApplication( ApplicationStore appStore = rmContext.getApplicationsStore() .createApplicationStore(submissionContext.getApplicationId(), submissionContext); - + // Create RMApp application = new RMAppImpl(applicationId, rmContext, this.conf, submissionContext.getApplicationName(), user, submissionContext.getQueue(), submissionContext, clientTokenStr, appStore, this.scheduler, - this.masterService); + this.masterService, submitTime); if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) { @@ -284,8 +284,9 @@ public void handle(RMAppManagerEvent event) { case APP_SUBMIT: { ApplicationSubmissionContext submissionContext = - ((RMAppManagerSubmitEvent)event).getSubmissionContext(); - submitApplication(submissionContext); + ((RMAppManagerSubmitEvent)event).getSubmissionContext(); + long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime(); + submitApplication(submissionContext, submitTime); } break; default: diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java index 495e7844280..afcd24da2fa 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java @@ -23,13 +23,21 @@ public class RMAppManagerSubmitEvent extends RMAppManagerEvent { private final ApplicationSubmissionContext submissionContext; + private final long submitTime; - public RMAppManagerSubmitEvent(ApplicationSubmissionContext submissionContext) { - super(submissionContext.getApplicationId(), RMAppManagerEventType.APP_SUBMIT); + public RMAppManagerSubmitEvent( + ApplicationSubmissionContext submissionContext, long submitTime) { + super(submissionContext.getApplicationId(), + RMAppManagerEventType.APP_SUBMIT); this.submissionContext = submissionContext; + this.submitTime = submitTime; } public ApplicationSubmissionContext getSubmissionContext() { return this.submissionContext; } + + public long getSubmitTime() { + return this.submitTime; + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index 07aac74f799..78187019197 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -22,8 +22,6 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.security.PrivilegedAction; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import javax.crypto.SecretKey; @@ -37,7 +35,6 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.SecurityInfo; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; @@ -58,7 +55,6 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager; -import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -83,6 +79,7 @@ public class AMLauncher implements Runnable { private final ApplicationTokenSecretManager applicationTokenSecretManager; private final ClientToAMSecretManager clientToAMSecretManager; private final AMLauncherEventType eventType; + private final RMContext rmContext; @SuppressWarnings("rawtypes") private final EventHandler handler; @@ -96,6 +93,7 @@ public AMLauncher(RMContext rmContext, RMAppAttempt application, this.applicationTokenSecretManager = applicationTokenSecretManager; this.clientToAMSecretManager = clientToAMSecretManager; this.eventType = eventType; + this.rmContext = rmContext; this.handler = rmContext.getDispatcher().getEventHandler(); } @@ -189,9 +187,18 @@ private void setupTokensAndEnv( throws IOException { Map environment = container.getEnvironment(); - // Set the AppAttemptId to be consumable by the AM. - environment.put(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV, - application.getAppAttemptId().toString()); + // Set the AppAttemptId, containerId, NMHTTPAdress, AppSubmitTime to be + // consumable by the AM. + environment.put(ApplicationConstants.AM_CONTAINER_ID_ENV, container + .getContainerId().toString()); + environment.put(ApplicationConstants.NM_HTTP_ADDRESS_ENV, application + .getMasterContainer().getNodeHttpAddress()); + environment.put( + ApplicationConstants.APP_SUBMIT_TIME_ENV, + String.valueOf(rmContext.getRMApps() + .get(application.getAppAttemptId().getApplicationId()) + .getSubmitTime())); + if (UserGroupInformation.isSecurityEnabled()) { // TODO: Security enabled/disabled info should come from RM. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 0ea9202fcbf..4998be5daef 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -115,6 +115,12 @@ public interface RMApp extends EventHandler { */ long getStartTime(); + /** + * the submit time of the application. + * @return the submit time of the application. + */ + long getSubmitTime(); + /** * The tracking url for the application master. * @return the tracking url for the application master. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 99315e05872..b988144e30b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -84,6 +84,7 @@ public class RMAppImpl implements RMApp { private final WriteLock writeLock; private final Map attempts = new LinkedHashMap(); + private final long submitTime; // Mutable fields private long startTime; @@ -163,7 +164,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, Configuration config, String name, String user, String queue, ApplicationSubmissionContext submissionContext, String clientTokenStr, ApplicationStore appStore, - YarnScheduler scheduler, ApplicationMasterService masterService) { + YarnScheduler scheduler, ApplicationMasterService masterService, + long submitTime) { this.applicationId = applicationId; this.name = name; @@ -178,6 +180,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.appStore = appStore; this.scheduler = scheduler; this.masterService = masterService; + this.submitTime = submitTime; this.startTime = System.currentTimeMillis(); this.maxRetries = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, @@ -367,6 +370,11 @@ public long getStartTime() { } } + @Override + public long getSubmitTime() { + return this.submitTime; + } + @Override public String getTrackingUrl() { this.readLock.lock(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 7536857b2cf..f66ba8e5e62 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -42,6 +42,7 @@ public class MockNM { private final String nodeIdStr; private final int memory; private final ResourceTrackerService resourceTracker; + private final int httpPort = 2; MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { this.nodeIdStr = nodeIdStr; @@ -53,6 +54,10 @@ public NodeId getNodeId() { return nodeId; } + public String getHttpAddress() { + return nodeId.getHost() + ":" + String.valueOf(httpPort); + } + public void containerStatus(Container container) throws Exception { Map> conts = new HashMap>(); @@ -69,7 +74,7 @@ public NodeId registerNode() throws Exception { RegisterNodeManagerRequest req = Records.newRecord( RegisterNodeManagerRequest.class); req.setNodeId(nodeId); - req.setHttpPort(2); + req.setHttpPort(httpPort); Resource resource = Records.newRecord(Resource.class); resource.setMemory(memory); req.setResource(resource); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index afdeb161775..5e4a17019bf 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -163,7 +163,7 @@ public void setCompletedAppsMax(int max) { } public void submitApplication( ApplicationSubmissionContext submissionContext) { - super.submitApplication(submissionContext); + super.submitApplication(submissionContext, System.currentTimeMillis()); } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index a12049f9e82..2834810d9fb 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; @@ -39,6 +40,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -56,6 +58,9 @@ private static final class MyContainerManagerImpl implements boolean launched = false; boolean cleanedup = false; String attemptIdAtContainerManager = null; + String containerIdAtContainerManager = null; + String nmAddressAtContainerManager = null; + long submitTimeAtContainerManager; @Override public StartContainerResponse @@ -63,9 +68,20 @@ private static final class MyContainerManagerImpl implements throws YarnRemoteException { LOG.info("Container started by MyContainerManager: " + request); launched = true; - attemptIdAtContainerManager = request.getContainerLaunchContext() - .getEnvironment().get( - ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV); + containerIdAtContainerManager = + request.getContainerLaunchContext().getEnvironment() + .get(ApplicationConstants.AM_CONTAINER_ID_ENV); + ContainerId containerId = + ConverterUtils.toContainerId(containerIdAtContainerManager); + attemptIdAtContainerManager = + containerId.getApplicationAttemptId().toString(); + nmAddressAtContainerManager = + request.getContainerLaunchContext().getEnvironment() + .get(ApplicationConstants.NM_HTTP_ADDRESS_ENV); + submitTimeAtContainerManager = + Long.parseLong(request.getContainerLaunchContext().getEnvironment() + .get(ApplicationConstants.APP_SUBMIT_TIME_ENV)); + return null; } @@ -140,6 +156,13 @@ public void testAMLaunchAndCleanup() throws Exception { ApplicationAttemptId appAttemptId = attempt.getAppAttemptId(); Assert.assertEquals(appAttemptId.toString(), containerManager.attemptIdAtContainerManager); + Assert.assertEquals(app.getSubmitTime(), + containerManager.submitTimeAtContainerManager); + Assert.assertEquals(app.getRMAppAttempt(appAttemptId) + .getSubmissionContext().getAMContainerSpec().getContainerId() + .toString(), containerManager.containerIdAtContainerManager); + Assert.assertEquals(nm1.getHttpAddress(), + containerManager.nmAddressAtContainerManager); MockAM am = new MockAM(rm.getRMContext(), rm .getApplicationMasterService(), appAttemptId); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index bda4d46e4e9..c4ef938f757 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -166,6 +166,11 @@ public long getStartTime() { throw new UnsupportedOperationException("Not supported yet."); } + @Override + public long getSubmitTime() { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public long getFinishTime() { throw new UnsupportedOperationException("Not supported yet."); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 81e10092c87..19b60972157 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -33,6 +33,7 @@ public class MockRMApp implements RMApp { String name = MockApps.newAppName(); String queue = MockApps.newQueue(); long start = System.currentTimeMillis() - (int) (Math.random() * DT); + long submit = start - (int) (Math.random() * DT); long finish = 0; RMAppState state = RMAppState.NEW; int failCount = 0; @@ -141,6 +142,11 @@ public long getStartTime() { return start; } + @Override + public long getSubmitTime() { + return submit; + } + public void setStartTime(long time) { this.start = time; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index a46673f36de..2ad44bf39b8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -21,7 +21,6 @@ import static org.mockito.Mockito.mock; import java.io.IOException; -import java.util.List; import junit.framework.Assert; @@ -51,7 +50,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.junit.Before; -import org.junit.After; import org.junit.Test; @@ -61,7 +59,7 @@ public class TestRMAppTransitions { private RMContext rmContext; private static int maxRetries = 4; private static int appId = 1; - private AsyncDispatcher rmDispatcher; +// private AsyncDispatcher rmDispatcher; // ignore all the RM application attempt events private static final class TestApplicationAttemptEventDispatcher implements @@ -152,7 +150,7 @@ protected RMApp createNewTestApp() { conf, name, user, queue, submissionContext, clientTokenStr, appStore, scheduler, - masterService); + masterService, System.currentTimeMillis()); testAppStartState(applicationId, user, name, queue, application); return application; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm index fd9e48ee94c..b02c32b2de6 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm @@ -323,25 +323,29 @@ Hadoop MapReduce Next Generation - Writing YARN Applications multi-tenancy nature, amongst other issues, it cannot make any assumptions of things like pre-configured ports that it can listen on. + * When the ApplicationMaster starts up, several parameters are made available + to it via the environment. These include the ContainerId for the + ApplicationMaster container, the application submission time and the HTTP + address of the NodeManager running the container. Ref ApplicationConstants + for parameter names. + * All interactions with the ResourceManager require an ApplicationAttemptId - (there can be multiple attempts per application in case of failures). When - the ApplicationMaster starts up, the ApplicationAttemptId associated with - this particular instance will be set in the environment. There are helper - apis to convert the value obtained from the environment into an - ApplicationAttemptId object. + (there can be multiple attempts per application in case of failures). The + ApplicationAttemptId can be obtained from the ApplicationMaster + containerId. There are helper apis to convert the value obtained from the + environment into objects. +---+ Map envs = System.getenv(); - ApplicationAttemptId appAttemptID = - Records.newRecord(ApplicationAttemptId.class); - if (!envs.containsKey(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV)) { - // app attempt id should always be set in the env by the framework + String containerIdString = + envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV); + if (containerIdString == null) { + // container id should always be set in the env by the framework throw new IllegalArgumentException( - "ApplicationAttemptId not set in the environment"); - } - appAttemptID = - ConverterUtils.toApplicationAttemptId( - envs.get(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV)); + "ContainerId not set in the environment"); + } + ContainerId containerId = ConverterUtils.toContainerId(containerIdString); + ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId(); +---+ * After an ApplicationMaster has initialized itself completely, it needs to