MAPREDUCE-3144. Augmented JobHistory with the information needed for serving aggregated logs. Contributed by Siddharth Seth.

svn merge -c r1185976 --ignore-ancestry ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1185977 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-19 05:28:22 +00:00
parent dfe8f79cde
commit e3b9d11da8
45 changed files with 688 additions and 197 deletions

View File

@ -359,6 +359,9 @@ Release 0.23.0 - Unreleased
from the NodeManager and set MALLOC_ARENA_MAX for all daemons and
containers. (Chris Riccomini via acmurthy)
MAPREDUCE-3144. Augmented JobHistory with the information needed for
serving aggregated logs. (Siddharth Seth via vinodkv)
OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and

View File

@ -275,7 +275,7 @@ public void stop() {
* @param jobId the jobId.
* @throws IOException
*/
protected void setupEventWriter(JobId jobId, JobSubmittedEvent jse)
protected void setupEventWriter(JobId jobId)
throws IOException {
if (stagingDirPath == null) {
LOG.error("Log Directory is null, returning");
@ -285,9 +285,6 @@ protected void setupEventWriter(JobId jobId, JobSubmittedEvent jse)
MetaInfo oldFi = fileMap.get(jobId);
Configuration conf = getConfig();
long submitTime = oldFi == null ? jse.getSubmitTime() : oldFi
.getJobIndexInfo().getSubmitTime();
// TODO Ideally this should be written out to the job dir
// (.staging/jobid/files - RecoveryService will need to be patched)
Path historyFile = JobHistoryUtils.getStagingJobHistoryFile(
@ -301,6 +298,8 @@ protected void setupEventWriter(JobId jobId, JobSubmittedEvent jse)
String jobName = context.getJob(jobId).getName();
EventWriter writer = (oldFi == null) ? null : oldFi.writer;
Path logDirConfPath =
JobHistoryUtils.getStagingConfFile(stagingDirPath, jobId, startCount);
if (writer == null) {
try {
FSDataOutputStream out = stagingDirFS.create(historyFile, true);
@ -312,31 +311,28 @@ protected void setupEventWriter(JobId jobId, JobSubmittedEvent jse)
+ "[" + jobName + "]");
throw ioe;
}
}
Path logDirConfPath = null;
if (conf != null) {
// TODO Ideally this should be written out to the job dir
// (.staging/jobid/files - RecoveryService will need to be patched)
logDirConfPath = JobHistoryUtils.getStagingConfFile(stagingDirPath, jobId,
startCount);
FSDataOutputStream jobFileOut = null;
try {
if (logDirConfPath != null) {
jobFileOut = stagingDirFS.create(logDirConfPath, true);
conf.writeXml(jobFileOut);
jobFileOut.close();
//Write out conf only if the writer isn't already setup.
if (conf != null) {
// TODO Ideally this should be written out to the job dir
// (.staging/jobid/files - RecoveryService will need to be patched)
FSDataOutputStream jobFileOut = null;
try {
if (logDirConfPath != null) {
jobFileOut = stagingDirFS.create(logDirConfPath, true);
conf.writeXml(jobFileOut);
jobFileOut.close();
}
} catch (IOException e) {
LOG.info("Failed to write the job configuration file", e);
throw e;
}
} catch (IOException e) {
LOG.info("Failed to write the job configuration file", e);
throw e;
}
}
MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer, submitTime,
MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer,
user, jobName, jobId);
fi.getJobSummary().setJobId(jobId);
fi.getJobSummary().setJobSubmitTime(submitTime);
fileMap.put(jobId, fi);
}
@ -368,11 +364,9 @@ protected void handleEvent(JobHistoryEvent event) {
synchronized (lock) {
// If this is JobSubmitted Event, setup the writer
if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
if (event.getHistoryEvent().getEventType() == EventType.AM_STARTED) {
try {
JobSubmittedEvent jobSubmittedEvent =
(JobSubmittedEvent) event.getHistoryEvent();
setupEventWriter(event.getJobID(), jobSubmittedEvent);
setupEventWriter(event.getJobID());
} catch (IOException ioe) {
LOG.error("Error JobHistoryEventHandler in handleEvent: " + event,
ioe);
@ -396,6 +390,12 @@ protected void handleEvent(JobHistoryEvent event) {
throw new YarnException(e);
}
if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
JobSubmittedEvent jobSubmittedEvent =
(JobSubmittedEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime());
}
// If this is JobFinishedEvent, close the writer and setup the job-index
if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) {
try {
@ -436,6 +436,7 @@ private void processEventForJobSummary(HistoryEvent event, JobSummary summary, J
JobSubmittedEvent jse = (JobSubmittedEvent) event;
summary.setUser(jse.getUserName());
summary.setQueue(jse.getJobQueueName());
summary.setJobSubmitTime(jse.getSubmitTime());
break;
case JOB_INITED:
JobInitedEvent jie = (JobInitedEvent) event;
@ -588,12 +589,12 @@ private class MetaInfo {
JobIndexInfo jobIndexInfo;
JobSummary jobSummary;
MetaInfo(Path historyFile, Path conf, EventWriter writer, long submitTime,
MetaInfo(Path historyFile, Path conf, EventWriter writer,
String user, String jobName, JobId jobId) {
this.historyFile = historyFile;
this.confFile = conf;
this.writer = writer;
this.jobIndexInfo = new JobIndexInfo(submitTime, -1, user, jobName, jobId, -1, -1, null);
this.jobIndexInfo = new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null);
this.jobSummary = new JobSummary();
}

View File

@ -22,7 +22,10 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -40,6 +43,8 @@
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
@ -72,6 +77,7 @@
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@ -82,6 +88,7 @@
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -115,14 +122,20 @@ public class MRAppMaster extends CompositeService {
private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
private Clock clock;
private final long startTime = System.currentTimeMillis();
private final long startTime;
private final long appSubmitTime;
private String appName;
private final ApplicationAttemptId appAttemptID;
private final ContainerId containerID;
private final String nmHost;
private final int nmHttpPort;
protected final MRAppMetrics metrics;
private Set<TaskId> completedTasksFromPreviousRun;
private List<AMInfo> amInfos;
private AppContext context;
private Dispatcher dispatcher;
private ClientService clientService;
private Recovery recoveryServ;
private ContainerAllocator containerAllocator;
private ContainerLauncher containerLauncher;
private TaskCleaner taskCleaner;
@ -131,19 +144,29 @@ public class MRAppMaster extends CompositeService {
private JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
private JobEventDispatcher jobEventDispatcher;
private boolean inRecovery = false;
private Job job;
private Credentials fsTokens = new Credentials(); // Filled during init
private UserGroupInformation currentUser; // Will be setup during init
public MRAppMaster(ApplicationAttemptId applicationAttemptId) {
this(applicationAttemptId, new SystemClock());
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmHttpPort, long appSubmitTime) {
this(applicationAttemptId, containerId, nmHost, nmHttpPort,
new SystemClock(), appSubmitTime);
}
public MRAppMaster(ApplicationAttemptId applicationAttemptId, Clock clock) {
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmHttpPort, Clock clock,
long appSubmitTime) {
super(MRAppMaster.class.getName());
this.clock = clock;
this.startTime = clock.getTime();
this.appSubmitTime = appSubmitTime;
this.appAttemptID = applicationAttemptId;
this.containerID = containerId;
this.nmHost = nmHost;
this.nmHttpPort = nmHttpPort;
this.metrics = MRAppMetrics.create();
LOG.info("Created MRAppMaster for application " + applicationAttemptId);
}
@ -162,11 +185,11 @@ public void init(final Configuration conf) {
if (conf.getBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false)
&& appAttemptID.getAttemptId() > 1) {
LOG.info("Recovery is enabled. Will try to recover from previous life.");
Recovery recoveryServ = new RecoveryService(appAttemptID, clock);
recoveryServ = new RecoveryService(appAttemptID, clock);
addIfService(recoveryServ);
dispatcher = recoveryServ.getDispatcher();
clock = recoveryServ.getClock();
completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
inRecovery = true;
} else {
dispatcher = new AsyncDispatcher();
addIfService(dispatcher);
@ -327,7 +350,8 @@ protected Job createJob(Configuration conf) {
// create single job
Job newJob = new JobImpl(appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
completedTasksFromPreviousRun, metrics, currentUser.getUserName());
completedTasksFromPreviousRun, metrics, currentUser.getUserName(),
appSubmitTime, amInfos);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
@ -463,6 +487,10 @@ public Set<TaskId> getCompletedTaskFromPreviousRun() {
return completedTasksFromPreviousRun;
}
public List<AMInfo> getAllAMInfos() {
return amInfos;
}
public ContainerAllocator getContainerAllocator() {
return containerAllocator;
}
@ -617,11 +645,33 @@ public Clock getClock() {
@Override
public void start() {
///////////////////// Create the job itself.
// Pull completedTasks etc from recovery
if (inRecovery) {
completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
amInfos = recoveryServ.getAMInfos();
}
// / Create the AMInfo for the current AppMaster
if (amInfos == null) {
amInfos = new LinkedList<AMInfo>();
}
AMInfo amInfo =
new AMInfo(appAttemptID, startTime, containerID, nmHost, nmHttpPort);
amInfos.add(amInfo);
// /////////////////// Create the job itself.
job = createJob(getConfig());
// End of creating the job.
// Send out an MR AM inited event for this AM and all previous AMs.
for (AMInfo info : amInfos) {
dispatcher.getEventHandler().handle(
new JobHistoryEvent(job.getID(), new AMStartedEvent(info
.getAppAttemptId(), info.getStartTime(), info.getContainerId(),
info.getNodeManagerHost(), info.getNodeManagerHttpPort())));
}
// metrics system init is really init & start.
// It's more test friendly to put it here.
DefaultMetricsSystem.initialize("MRAppMaster");
@ -723,17 +773,39 @@ public void handle(SpeculatorEvent event) {
public static void main(String[] args) {
try {
String applicationAttemptIdStr = System
.getenv(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV);
if (applicationAttemptIdStr == null) {
String msg = ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV
+ " is null";
String containerIdStr =
System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
String nodeHttpAddressStr =
System.getenv(ApplicationConstants.NM_HTTP_ADDRESS_ENV);
String appSubmitTimeStr =
System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
if (containerIdStr == null) {
String msg = ApplicationConstants.AM_CONTAINER_ID_ENV + " is null";
LOG.error(msg);
throw new IOException(msg);
}
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
MRAppMaster appMaster = new MRAppMaster(applicationAttemptId);
if (nodeHttpAddressStr == null) {
String msg = ApplicationConstants.NM_HTTP_ADDRESS_ENV + " is null";
LOG.error(msg);
throw new IOException(msg);
}
if (appSubmitTimeStr == null) {
String msg = ApplicationConstants.APP_SUBMIT_TIME_ENV + " is null";
LOG.error(msg);
throw new IOException(msg);
}
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationAttemptId applicationAttemptId =
containerId.getApplicationAttemptId();
InetSocketAddress nodeHttpInetAddr =
NetUtils.createSocketAddr(nodeHttpAddressStr);
long appSubmitTime = Long.parseLong(appSubmitTimeStr);
MRAppMaster appMaster =
new MRAppMaster(applicationAttemptId, containerId,
nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(),
appSubmitTime);
Runtime.getRuntime().addShutdownHook(
new CompositeServiceShutdownHook(appMaster));
YarnConfiguration conf = new YarnConfiguration(new JobConf());

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@ -68,5 +69,10 @@ public interface Job {
TaskAttemptCompletionEvent[]
getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
/**
* @return information for MR AppMasters (previously failed and current)
*/
List<AMInfo> getAMInfos();
boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation);
}

View File

@ -51,6 +51,7 @@
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
@ -136,6 +137,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private final String username;
private final Map<JobACL, AccessControlList> jobACLs;
private final Set<TaskId> completedTasksFromPreviousRun;
private final List<AMInfo> amInfos;
private final Lock readLock;
private final Lock writeLock;
private final JobId jobId;
@ -148,6 +150,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private final EventHandler eventHandler;
private final MRAppMetrics metrics;
private final String userName;
private final long appSubmitTime;
private boolean lazyTasksCopyNeeded = false;
private volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
@ -354,7 +357,6 @@ JobEventType.JOB_KILL, new KillTasksTransition())
private int failedReduceTaskCount = 0;
private int killedMapTaskCount = 0;
private int killedReduceTaskCount = 0;
private long submitTime;
private long startTime;
private long finishTime;
private float setupProgress;
@ -370,7 +372,7 @@ public JobImpl(ApplicationAttemptId applicationAttemptId, Configuration conf,
JobTokenSecretManager jobTokenSecretManager,
Credentials fsTokenCredentials, Clock clock,
Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics,
String userName) {
String userName, long appSubmitTime, List<AMInfo> amInfos) {
this.applicationAttemptId = applicationAttemptId;
this.jobId = recordFactory.newRecordInstance(JobId.class);
this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
@ -378,7 +380,9 @@ public JobImpl(ApplicationAttemptId applicationAttemptId, Configuration conf,
this.metrics = metrics;
this.clock = clock;
this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
this.amInfos = amInfos;
this.userName = userName;
this.appSubmitTime = appSubmitTime;
ApplicationId applicationId = applicationAttemptId.getApplicationId();
jobId.setAppId(applicationId);
jobId.setId(applicationId.getId());
@ -806,6 +810,11 @@ public int getTotalReduces() {
public Map<JobACL, AccessControlList> getJobACLs() {
return Collections.unmodifiableMap(jobACLs);
}
@Override
public List<AMInfo> getAMInfos() {
return amInfos;
}
public static class InitTransition
implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
@ -819,7 +828,6 @@ public static class InitTransition
*/
@Override
public JobState transition(JobImpl job, JobEvent event) {
job.submitTime = job.clock.getTime();
job.metrics.submittedJob(job);
job.metrics.preparingJob(job);
try {
@ -830,7 +838,7 @@ public JobState transition(JobImpl job, JobEvent event) {
JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
job.conf.get(MRJobConfig.JOB_NAME, "test"),
job.conf.get(MRJobConfig.USER_NAME, "mapred"),
job.submitTime,
job.appSubmitTime,
job.remoteJobConfFile.toString(),
job.jobACLs, job.conf.get(MRJobConfig.QUEUE_NAME, "default"));
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
@ -1152,7 +1160,7 @@ public void transition(JobImpl job, JobEvent event) {
job.isUber()); //Will transition to state running. Currently in INITED
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
job.submitTime, job.startTime);
job.appSubmitTime, job.startTime);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
job.metrics.runningJob(job);

View File

@ -894,15 +894,20 @@ private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
return jce;
}
private static TaskAttemptUnsuccessfulCompletionEvent createTaskAttemptUnsuccessfulCompletionEvent(
TaskAttemptImpl taskAttempt, TaskAttemptState attemptState) {
TaskAttemptUnsuccessfulCompletionEvent tauce = new TaskAttemptUnsuccessfulCompletionEvent(
TypeConverter.fromYarn(taskAttempt.attemptId),
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
attemptState.toString(), taskAttempt.finishTime,
taskAttempt.nodeHostName == null ? "UNKNOWN" : taskAttempt.nodeHostName,
StringUtils.join(LINE_SEPARATOR, taskAttempt.getDiagnostics()),
taskAttempt.getProgressSplitBlock().burst());
private static
TaskAttemptUnsuccessfulCompletionEvent
createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt,
TaskAttemptState attemptState) {
TaskAttemptUnsuccessfulCompletionEvent tauce =
new TaskAttemptUnsuccessfulCompletionEvent(
TypeConverter.fromYarn(taskAttempt.attemptId),
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId()
.getTaskType()), attemptState.toString(),
taskAttempt.finishTime,
taskAttempt.containerMgrAddress == null ? "UNKNOWN"
: taskAttempt.containerMgrAddress, StringUtils.join(
LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt
.getProgressSplitBlock().burst());
return tauce;
}
@ -1120,11 +1125,15 @@ public void transition(TaskAttemptImpl taskAttempt,
, 1);
taskAttempt.eventHandler.handle(jce);
LOG.info("TaskAttempt: [" + taskAttempt.attemptId
+ "] using containerId: [" + taskAttempt.containerID + " on NM: ["
+ taskAttempt.containerMgrAddress + "]");
TaskAttemptStartedEvent tase =
new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
taskAttempt.launchTime,
nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(), taskAttempt.shufflePort);
nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(),
taskAttempt.shufflePort, taskAttempt.containerID);
taskAttempt.eventHandler.handle
(new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
taskAttempt.eventHandler.handle
@ -1236,7 +1245,8 @@ private void logAttemptFinishedEvent(TaskAttemptState state) {
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
state.toString(),
this.reportedStatus.mapFinishTime,
finishTime, this.nodeHostName == null ? "UNKNOWN" : this.nodeHostName,
finishTime, this.containerMgrAddress == null ? "UNKNOWN"
: this.containerMgrAddress,
this.reportedStatus.stateString,
TypeConverter.fromYarn(getCounters()),
getProgressSplitBlock().burst());
@ -1249,7 +1259,8 @@ private void logAttemptFinishedEvent(TaskAttemptState state) {
state.toString(),
this.reportedStatus.shuffleFinishTime,
this.reportedStatus.sortFinishTime,
finishTime, this.nodeHostName == null ? "UNKNOWN" : this.nodeHostName,
finishTime, this.containerMgrAddress == null ? "UNKNOWN"
: this.containerMgrAddress,
this.reportedStatus.stateString,
TypeConverter.fromYarn(getCounters()),
getProgressSplitBlock().burst());

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.mapreduce.v2.app.recover;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -31,4 +33,6 @@ public interface Recovery {
Clock getClock();
Set<TaskId> getCompletedTasks();
List<AMInfo> getAMInfos();
}

View File

@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -34,6 +35,7 @@
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
@ -148,6 +150,14 @@ public Clock getClock() {
public Set<TaskId> getCompletedTasks() {
return completedTasks.keySet();
}
@Override
public List<AMInfo> getAMInfos() {
if (jobInfo == null || jobInfo.getAMInfos() == null) {
return new LinkedList<AMInfo>();
}
return new LinkedList<AMInfo>(jobInfo.getAMInfos());
}
private void parse() throws IOException {
// TODO: parse history file based on startCount
@ -351,15 +361,16 @@ private void sendStatusUpdateEvent(TaskAttemptId yarnAttemptID,
private void sendAssignedEvent(TaskAttemptId yarnAttemptID,
TaskAttemptInfo attemptInfo) {
LOG.info("Sending assigned event to " + yarnAttemptID);
ContainerId cId = recordFactory
.newRecordInstance(ContainerId.class);
ContainerId cId = attemptInfo.getContainerId();
Container container = recordFactory
.newRecordInstance(Container.class);
container.setId(cId);
container.setNodeId(recordFactory
.newRecordInstance(NodeId.class));
// NodeId can be obtained from TaskAttemptInfo.hostname - but this will
// eventually contain rack info.
container.setContainerToken(null);
container.setNodeHttpAddress(attemptInfo.getHostname() + ":" +
container.setNodeHttpAddress(attemptInfo.getTrackerName() + ":" +
attemptInfo.getHttpPort());
actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID,
container));

View File

@ -77,6 +77,7 @@
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.BuilderUtils;
/**
@ -118,10 +119,20 @@ private static ApplicationAttemptId getApplicationAttemptId(
applicationAttemptId.setAttemptId(startCount);
return applicationAttemptId;
}
private static ContainerId getContainerId(ApplicationId applicationId,
int startCount) {
ApplicationAttemptId appAttemptId =
getApplicationAttemptId(applicationId, startCount);
ContainerId containerId =
BuilderUtils.newContainerId(appAttemptId, startCount);
return containerId;
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) {
super(getApplicationAttemptId(applicationId, startCount));
super(getApplicationAttemptId(applicationId, startCount), getContainerId(
applicationId, startCount), "testhost", 3333, System.currentTimeMillis());
this.testWorkDir = new File("target", testName);
testAbsPath = new Path(testWorkDir.getAbsolutePath());
LOG.info("PathUsed: " + testAbsPath);
@ -405,10 +416,10 @@ protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
public TestJob(Configuration conf, ApplicationId applicationId,
EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
Clock clock, String user) {
super(getApplicationAttemptId(applicationId, getStartCount()),
conf, eventHandler, taskAttemptListener,
new JobTokenSecretManager(), new Credentials(), clock,
getCompletedTaskFromPreviousRun(), metrics, user);
super(getApplicationAttemptId(applicationId, getStartCount()), conf,
eventHandler, taskAttemptListener, new JobTokenSecretManager(),
new Credentials(), clock, getCompletedTaskFromPreviousRun(), metrics,
user, System.currentTimeMillis(), getAllAMInfos());
// This "this leak" is okay because the retained pointer is in an
// instance variable.

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@ -488,6 +489,11 @@ public Path getConfFile() {
public Map<JobACL, AccessControlList> getJobACLs() {
return Collections.<JobACL, AccessControlList>emptyMap();
}
@Override
public List<AMInfo> getAMInfos() {
throw new UnsupportedOperationException("Not supported yet.");
}
};
}
}

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
@ -36,11 +37,15 @@ public class TestMRAppMaster {
public void testMRAppMasterForDifferentUser() throws IOException,
InterruptedException {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000001";
String containerIdStr = "container_1317529182569_0004_000001_1";
String stagingDir = "/tmp/staging";
String userName = "TestAppMasterUser";
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
MRAppMasterTest appMaster = new MRAppMasterTest(applicationAttemptId);
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMasterTest appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1,
System.currentTimeMillis());
YarnConfiguration conf = new YarnConfiguration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@ -54,8 +59,9 @@ class MRAppMasterTest extends MRAppMaster {
Path stagingDirPath;
private Configuration conf;
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId) {
super(applicationAttemptId);
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, long submitTime) {
super(applicationAttemptId, containerId, host, port, submitTime);
}
@Override

View File

@ -340,8 +340,8 @@ private static class FakeJob extends JobImpl {
public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf,
int numMaps, int numReduces) {
super(appAttemptID, conf, null, null, null, null, null, null, null,
null);
super(appAttemptID, conf, null, null, null, null, null, null, null, null,
System.currentTimeMillis(), null);
this.jobId = MRBuilderUtils
.newJobId(appAttemptID.getApplicationId(), 0);
this.numMaps = numMaps;

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
@ -46,6 +47,7 @@ public class TestRecovery {
@Test
public void testCrashed() throws Exception {
int runCount = 0;
long am1StartTimeEst = System.currentTimeMillis();
MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
@ -126,9 +128,10 @@ public void testCrashed() throws Exception {
//stop the app
app.stop();
//rerun
//in rerun the 1st map will be recovered from previous run
long am2StartTimeEst = System.currentTimeMillis();
app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
@ -178,8 +181,27 @@ public void testCrashed() throws Exception {
task1StartTime, mapTask1.getReport().getStartTime());
Assert.assertEquals("Task Finish time not correct",
task1FinishTime, mapTask1.getReport().getFinishTime());
Assert.assertEquals(2, job.getAMInfos().size());
int attemptNum = 1;
// Verify AMInfo
for (AMInfo amInfo : job.getAMInfos()) {
Assert.assertEquals(attemptNum++, amInfo.getAppAttemptId()
.getAttemptId());
Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
.getApplicationAttemptId());
Assert.assertEquals("testhost", amInfo.getNodeManagerHost());
Assert.assertEquals(3333, amInfo.getNodeManagerHttpPort());
}
long am1StartTimeReal = job.getAMInfos().get(0).getStartTime();
long am2StartTimeReal = job.getAMInfos().get(1).getStartTime();
Assert.assertTrue(am1StartTimeReal >= am1StartTimeEst
&& am1StartTimeReal <= am2StartTimeEst);
Assert.assertTrue(am2StartTimeReal >= am2StartTimeEst
&& am2StartTimeReal <= System.currentTimeMillis());
// TODO Add verification of additional data from jobHistory - whatever was
// available in the failed attempt should be available here
}
class MRAppWithHistory extends MRApp {
public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart, int startCount) {

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@ -473,6 +474,11 @@ public Path getConfFile() {
public Map<JobACL, AccessControlList> getJobACLs() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public List<AMInfo> getAMInfos() {
throw new UnsupportedOperationException("Not supported yet.");
}
}
/*

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
@ -81,7 +82,9 @@ public void testDeletionofStaging() throws IOException {
private class TestMRApp extends MRAppMaster {
public TestMRApp(ApplicationAttemptId applicationAttemptId) {
super(applicationAttemptId);
super(applicationAttemptId, BuilderUtils.newContainerId(
applicationAttemptId, 1), "testhost", 3333, System
.currentTimeMillis());
}
@Override

View File

@ -69,6 +69,16 @@
]
},
{"type": "record", "name": "AMStarted",
"fields": [
{"name": "applicationAttemptId", "type": "string"},
{"name": "startTime", "type": "long"},
{"name": "containerId", "type": "string"},
{"name": "nodeManagerHost", "type": "string"},
{"name": "nodeManagerHttpPort", "type": "int"}
]
},
{"type": "record", "name": "JobSubmitted",
"fields": [
{"name": "jobid", "type": "string"},
@ -174,7 +184,8 @@
{"name": "startTime", "type": "long"},
{"name": "trackerName", "type": "string"},
{"name": "httpPort", "type": "int"},
{"name": "shufflePort", "type": "int"}
{"name": "shufflePort", "type": "int"},
{"name": "containerId", "type": "string"}
]
},
@ -260,7 +271,8 @@
"CLEANUP_ATTEMPT_STARTED",
"CLEANUP_ATTEMPT_FINISHED",
"CLEANUP_ATTEMPT_FAILED",
"CLEANUP_ATTEMPT_KILLED"
"CLEANUP_ATTEMPT_KILLED",
"AM_STARTED"
]
},
@ -272,6 +284,7 @@
"JobFinished",
"JobInfoChange",
"JobInited",
"AMStarted",
"JobPriorityChange",
"JobStatusChanged",
"JobSubmitted",

View File

@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.avro.util.Utf8;
/**
* Event to record start of a task attempt
*
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class AMStartedEvent implements HistoryEvent {
private AMStarted datum = new AMStarted();
/**
* Create an event to record the start of an MR AppMaster
*
* @param appAttemptId
* the application attempt id.
* @param startTime
* the start time of the AM.
* @param containerId
* the containerId of the AM.
* @param nodeManagerHost
* the node on which the AM is running.
* @param nodeManagerHttpPort
* the httpPort for the node running the AM.
*/
public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
ContainerId containerId, String nodeManagerHost, int nodeManagerHttpPort) {
datum.applicationAttemptId = new Utf8(appAttemptId.toString());
datum.startTime = startTime;
datum.containerId = new Utf8(containerId.toString());
datum.nodeManagerHost = new Utf8(nodeManagerHost);
datum.nodeManagerHttpPort = nodeManagerHttpPort;
}
AMStartedEvent() {
}
public Object getDatum() {
return datum;
}
public void setDatum(Object datum) {
this.datum = (AMStarted) datum;
}
/**
* @return the ApplicationAttemptId
*/
public ApplicationAttemptId getAppAttemptId() {
return ConverterUtils.toApplicationAttemptId(datum.applicationAttemptId
.toString());
}
/**
* @return the start time for the MRAppMaster
*/
public long getStartTime() {
return datum.startTime;
}
/**
* @return the ContainerId for the MRAppMaster.
*/
public ContainerId getContainerId() {
return ConverterUtils.toContainerId(datum.containerId.toString());
}
/**
* @return the node manager host.
*/
public String getNodeManagerHost() {
return datum.nodeManagerHost.toString();
}
/**
* @return the http port for the tracker.
*/
public int getNodeManagerHttpPort() {
return datum.nodeManagerHttpPort;
}
/** Get the attempt id */
@Override
public EventType getEventType() {
return EventType.AM_STARTED;
}
}

View File

@ -33,7 +33,6 @@
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
@ -146,8 +145,10 @@ public HistoryEvent getNextEvent() throws IOException {
result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
case CLEANUP_ATTEMPT_KILLED:
result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
case AM_STARTED:
result = new AMStartedEvent(); break;
default:
throw new RuntimeException("unexpected event type!");
throw new RuntimeException("unexpected event type: " + wrapper.type);
}
result.setDatum(wrapper.event);
return result;

View File

@ -20,6 +20,8 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
@ -37,6 +39,8 @@
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
/**
* Default Parser for the JobHistory files. Typical usage is
@ -174,6 +178,9 @@ private void handleEvent(HistoryEvent event) throws IOException {
case CLEANUP_ATTEMPT_FINISHED:
handleTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
break;
case AM_STARTED:
handleAMStartedEvent((AMStartedEvent) event);
break;
default:
break;
}
@ -241,6 +248,7 @@ private void handleTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
attemptInfo.trackerName = event.getTrackerName();
attemptInfo.taskType = event.getTaskType();
attemptInfo.shufflePort = event.getShufflePort();
attemptInfo.containerId = event.getContainerId();
taskInfo.attemptsMap.put(attemptId, attemptInfo);
}
@ -305,6 +313,20 @@ private void handleJobInitedEvent(JobInitedEvent event) {
info.totalReduces = event.getTotalReduces();
info.uberized = event.getUberized();
}
private void handleAMStartedEvent(AMStartedEvent event) {
AMInfo amInfo = new AMInfo();
amInfo.appAttemptId = event.getAppAttemptId();
amInfo.startTime = event.getStartTime();
amInfo.containerId = event.getContainerId();
amInfo.nodeManagerHost = event.getNodeManagerHost();
amInfo.nodeManagerHttpPort = event.getNodeManagerHttpPort();
if (info.amInfos == null) {
info.amInfos = new LinkedList<AMInfo>();
}
info.amInfos.add(amInfo);
info.latestAmInfo = amInfo;
}
private void handleJobInfoChangeEvent(JobInfoChangeEvent event) {
info.submitTime = event.getSubmitTime();
@ -348,6 +370,8 @@ public static class JobInfo {
Map<JobACL, AccessControlList> jobACLs;
Map<TaskID, TaskInfo> tasksMap;
List<AMInfo> amInfos;
AMInfo latestAmInfo;
boolean uberized;
/** Create a job info object where job information will be stored
@ -377,7 +401,9 @@ public void printAll() {
System.out.println("REDUCE_COUNTERS:" + reduceCounters.toString());
System.out.println("TOTAL_COUNTERS: " + totalCounters.toString());
System.out.println("UBERIZED: " + uberized);
for (AMInfo amInfo : amInfos) {
amInfo.printAll();
}
for (TaskInfo ti: tasksMap.values()) {
ti.printAll();
}
@ -427,6 +453,10 @@ public void printAll() {
public Map<JobACL, AccessControlList> getJobACLs() { return jobACLs; }
/** @return the uberized status of this job */
public boolean getUberized() { return uberized; }
/** @return the AMInfo for the job's AppMaster */
public List<AMInfo> getAMInfos() { return amInfos; }
/** @return the AMInfo for the newest AppMaster */
public AMInfo getLatestAMInfo() { return latestAmInfo; };
}
/**
@ -509,6 +539,7 @@ public static class TaskAttemptInfo {
int httpPort;
int shufflePort;
String hostname;
ContainerId containerId;
/** Create a Task Attempt Info which will store attempt level information
* on a history parse.
@ -534,6 +565,7 @@ public void printAll() {
System.out.println("TRACKER_NAME:" + trackerName);
System.out.println("HTTP_PORT:" + httpPort);
System.out.println("SHUFFLE_PORT:" + shufflePort);
System.out.println("CONTIANER_ID:" + containerId);
if (counters != null) {
System.out.println("COUNTERS:" + counters.toString());
}
@ -569,5 +601,74 @@ public void printAll() {
public int getHttpPort() { return httpPort; }
/** @return the Shuffle port for the tracker */
public int getShufflePort() { return shufflePort; }
/** @return the ContainerId for the tracker */
public ContainerId getContainerId() { return containerId; }
}
/**
* Stores AM information
*/
public static class AMInfo {
ApplicationAttemptId appAttemptId;
long startTime;
ContainerId containerId;
String nodeManagerHost;
int nodeManagerHttpPort;
/**
* Create a AM Info which will store AM level information on a history
* parse.
*/
public AMInfo() {
startTime = -1;
nodeManagerHost = "";
nodeManagerHttpPort = -1;
}
public AMInfo(ApplicationAttemptId appAttemptId, long startTime,
ContainerId containerId, String nodeManagerHost, int nodeManagerHttpPort) {
this.appAttemptId = appAttemptId;
this.startTime = startTime;
this.containerId = containerId;
this.nodeManagerHost = nodeManagerHost;
this.nodeManagerHttpPort = nodeManagerHttpPort;
}
/**
* Print all the information about this AM.
*/
public void printAll() {
System.out.println("APPLICATION_ATTEMPT_ID:" + appAttemptId.toString());
System.out.println("START_TIME: " + startTime);
System.out.println("CONTAINER_ID: " + containerId.toString());
System.out.println("NODE_MANAGER_HOST: " + nodeManagerHost);
System.out.println("NODE_MANAGER_HTTP_PORT: " + nodeManagerHttpPort);
}
/** @return the ApplicationAttemptId */
public ApplicationAttemptId getAppAttemptId() {
return appAttemptId;
}
/** @return the start time of the AM */
public long getStartTime() {
return startTime;
}
/** @return the container id for the AM */
public ContainerId getContainerId() {
return containerId;
}
/** @return the host name for the node manager on which the AM is running */
public String getNodeManagerHost() {
return nodeManagerHost;
}
/** @return the http port for the node manager running the AM */
public int getNodeManagerHttpPort() {
return nodeManagerHttpPort;
}
}
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobID;

View File

@ -18,13 +18,13 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.avro.util.Utf8;
@ -45,10 +45,11 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
* @param trackerName Name of the Task Tracker where attempt is running
* @param httpPort The port number of the tracker
* @param shufflePort The shuffle port number of the container
* @param containerId The containerId for the task attempt.
*/
public TaskAttemptStartedEvent( TaskAttemptID attemptId,
TaskType taskType, long startTime, String trackerName,
int httpPort, int shufflePort) {
int httpPort, int shufflePort, ContainerId containerId) {
datum.attemptId = new Utf8(attemptId.toString());
datum.taskid = new Utf8(attemptId.getTaskID().toString());
datum.startTime = startTime;
@ -56,6 +57,15 @@ public TaskAttemptStartedEvent( TaskAttemptID attemptId,
datum.trackerName = new Utf8(trackerName);
datum.httpPort = httpPort;
datum.shufflePort = shufflePort;
datum.containerId = new Utf8(containerId.toString());
}
// TODO Remove after MrV1 is removed.
// Using a dummy containerId to prevent jobHistory parse failures.
public TaskAttemptStartedEvent(TaskAttemptID attemptId, TaskType taskType,
long startTime, String trackerName, int httpPort, int shufflePort) {
this(attemptId, taskType, startTime, trackerName, httpPort, shufflePort,
ConverterUtils.toContainerId("container_-1_-1_-1_-1"));
}
TaskAttemptStartedEvent() {}
@ -91,5 +101,8 @@ public EventType getEventType() {
? EventType.MAP_ATTEMPT_STARTED
: EventType.REDUCE_ATTEMPT_STARTED;
}
/** Get the ContainerId */
public ContainerId getContainerId() {
return ConverterUtils.toContainerId(datum.containerId.toString());
}
}

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -337,4 +338,9 @@ public String getUserName() {
public Path getConfFile() {
return confFile;
}
@Override
public List<AMInfo> getAMInfos() {
return jobInfo.getAMInfos();
}
}

View File

@ -29,8 +29,6 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -85,35 +83,17 @@ public class CompletedTaskAttempt implements TaskAttempt {
@Override
public ContainerId getAssignedContainerID() {
//TODO ContainerId needs to be part of some historyEvent to be able to
//render the log directory.
ContainerId containerId =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
ContainerId.class);
containerId.setId(-1);
ApplicationAttemptId applicationAttemptId =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
ApplicationAttemptId.class);
applicationAttemptId.setAttemptId(-1);
ApplicationId applicationId =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
ApplicationId.class);
applicationId.setClusterTimestamp(-1);
applicationId.setId(-1);
applicationAttemptId.setApplicationId(applicationId);
containerId.setApplicationAttemptId(applicationAttemptId);
return containerId;
return attemptInfo.getContainerId();
}
@Override
public String getAssignedContainerMgrAddress() {
// TODO Verify this is correct.
return attemptInfo.getTrackerName();
return attemptInfo.getHostname();
}
@Override
public String getNodeHttpAddress() {
return attemptInfo.getHostname() + ":" + attemptInfo.getHttpPort();
return attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort();
}
@Override

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@ -159,4 +160,9 @@ public Map<JobACL, AccessControlList> getJobACLs() {
throw new IllegalStateException("Not implemented yet");
}
@Override
public List<AMInfo> getAMInfos() {
return null;
}
}

View File

@ -41,8 +41,10 @@
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
public class TestJobHistoryEvents {
@ -159,6 +161,10 @@ private void verifyTask(Task task) {
private void verifyAttempt(TaskAttempt attempt) {
Assert.assertEquals("TaskAttempt state not currect",
TaskAttemptState.SUCCEEDED, attempt.getState());
Assert.assertNotNull(attempt.getAssignedContainerID());
//Verify the wrong ctor is not being used. Remove after mrv1 is removed.
ContainerId fakeCid = BuilderUtils.newContainerId(-1, -1, -1, -1);
Assert.assertFalse(attempt.getAssignedContainerID().equals(fakeCid));
}
static class MRAppWithHistory extends MRApp {

View File

@ -34,8 +34,9 @@
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
@ -46,7 +47,9 @@
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
public class TestJobHistoryParsing {
@ -54,6 +57,7 @@ public class TestJobHistoryParsing {
@Test
public void testHistoryParsing() throws Exception {
Configuration conf = new Configuration();
long amStartTimeEst = System.currentTimeMillis();
MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
@ -102,12 +106,30 @@ public void testHistoryParsing() throws Exception {
job.isUber(), jobInfo.getUberized());
int totalTasks = jobInfo.getAllTasks().size();
Assert.assertEquals("total number of tasks is incorrect ", 3, totalTasks);
// Verify aminfo
Assert.assertEquals(1, jobInfo.getAMInfos().size());
Assert.assertEquals("testhost", jobInfo.getAMInfos().get(0)
.getNodeManagerHost());
AMInfo amInfo = jobInfo.getAMInfos().get(0);
Assert.assertEquals(3333, amInfo.getNodeManagerHttpPort());
Assert.assertEquals(1, amInfo.getAppAttemptId().getAttemptId());
Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
.getApplicationAttemptId());
Assert.assertTrue(amInfo.getStartTime() <= System.currentTimeMillis()
&& amInfo.getStartTime() >= amStartTimeEst);
ContainerId fakeCid = BuilderUtils.newContainerId(-1, -1, -1, -1);
//Assert at taskAttempt level
for (TaskInfo taskInfo : jobInfo.getAllTasks().values()) {
int taskAttemptCount = taskInfo.getAllTaskAttempts().size();
Assert.assertEquals("total number of task attempts ",
1, taskAttemptCount);
TaskAttemptInfo taInfo =
taskInfo.getAllTaskAttempts().values().iterator().next();
Assert.assertNotNull(taInfo.getContainerId());
//Verify the wrong ctor is not being used. Remove after mrv1 is removed.
Assert.assertFalse(taInfo.getContainerId().equals(fakeCid));
}
// Deep compare Job and JobInfo

View File

@ -36,12 +36,24 @@ public interface ApplicationConstants {
// TODO: They say tokens via env isn't good.
public static final String APPLICATION_CLIENT_SECRET_ENV_NAME =
"AppClientTokenEnv";
/**
* The environmental variable for APPLICATION_ATTEMPT_ID. Set in
* ApplicationMaster's environment only.
* The environment variable for CONTAINER_ID. Set in AppMaster environment
* only
*/
public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID";
public static final String AM_CONTAINER_ID_ENV = "AM_CONTAINER_ID";
/**
* The environment variable for NM_HTTP_ADDRESS. Set in AppMaster environment
* only
*/
public static final String NM_HTTP_ADDRESS_ENV = "NM_HTTP_ADDRESS";
/**
* The environment variable for APP_SUBMIT_TIME. Set in AppMaster environment
* only
*/
public static final String APP_SUBMIT_TIME_ENV = "APP_SUBMIT_TIME_ENV";
public static final String CONTAINER_TOKEN_FILE_ENV_NAME =
UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;

View File

@ -126,7 +126,7 @@ public interface ApplicationSubmissionContext {
@Public
@Stable
public void setUser(String user);
/**
* Get the <code>ContainerLaunchContext</code> to describe the
* <code>Container</code> with which the <code>ApplicationMaster</code> is

View File

@ -287,7 +287,7 @@ public boolean init(String[] args) throws ParseException, IOException {
Map<String, String> envs = System.getenv();
appAttemptID = Records.newRecord(ApplicationAttemptId.class);
if (!envs.containsKey(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV)) {
if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
if (cliParser.hasOption("app_attempt_id")) {
String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
@ -296,7 +296,8 @@ public boolean init(String[] args) throws ParseException, IOException {
throw new IllegalArgumentException("Application Attempt Id not set in the environment");
}
} else {
appAttemptID = ConverterUtils.toApplicationAttemptId(envs.get(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV));
ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV));
appAttemptID = containerId.getApplicationAttemptId();
}
LOG.info("Application master for app"

View File

@ -20,10 +20,8 @@
import static org.apache.hadoop.yarn.util.StringHelper._split;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@ -33,9 +31,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -97,27 +93,8 @@ public static URL getYarnUrlFromURI(URI uri) {
return url;
}
// TODO: Why thread local?
// ^ NumberFormat instances are not threadsafe
private static final ThreadLocal<NumberFormat> appIdFormat =
new ThreadLocal<NumberFormat>() {
@Override
public NumberFormat initialValue() {
NumberFormat fmt = NumberFormat.getInstance();
fmt.setGroupingUsed(false);
fmt.setMinimumIntegerDigits(4);
return fmt;
}
};
public static String toString(ApplicationId appId) {
StringBuilder sb = new StringBuilder();
sb.append(APPLICATION_PREFIX + "_").append(appId.getClusterTimestamp())
.append("_");
sb.append(appIdFormat.get().format(appId.getId()));
return sb.toString();
return appId.toString();
}
public static ApplicationId toApplicationId(RecordFactory recordFactory,
@ -152,11 +129,11 @@ public static String toString(ContainerId cId) {
return cId.toString();
}
public static ContainerId toContainerId(String containerIdStr)
throws IOException {
public static ContainerId toContainerId(String containerIdStr) {
Iterator<String> it = _split(containerIdStr).iterator();
if (!it.next().equals(CONTAINER_PREFIX)) {
throw new IOException("Invalid ContainerId prefix: " + containerIdStr);
throw new IllegalArgumentException("Invalid ContainerId prefix: "
+ containerIdStr);
}
try {
ApplicationAttemptId appAttemptID = toApplicationAttemptId(it);
@ -165,21 +142,22 @@ public static ContainerId toContainerId(String containerIdStr)
containerId.setId(Integer.parseInt(it.next()));
return containerId;
} catch (NumberFormatException n) {
throw new IOException("Invalid ContainerId: " + containerIdStr, n);
throw new IllegalArgumentException("Invalid ContainerId: "
+ containerIdStr, n);
}
}
public static ApplicationAttemptId toApplicationAttemptId(
String applicationAttmeptIdStr) throws IOException {
String applicationAttmeptIdStr) {
Iterator<String> it = _split(applicationAttmeptIdStr).iterator();
if (!it.next().equals(APPLICATION_ATTEMPT_PREFIX)) {
throw new IOException("Invalid AppAttemptId prefix: "
throw new IllegalArgumentException("Invalid AppAttemptId prefix: "
+ applicationAttmeptIdStr);
}
try {
return toApplicationAttemptId(it);
} catch (NumberFormatException n) {
throw new IOException("Invalid AppAttemptId: "
throw new IllegalArgumentException("Invalid AppAttemptId: "
+ applicationAttmeptIdStr, n);
}
}

View File

@ -69,7 +69,7 @@ protected void render(Block html) {
ContainerId containerId;
try {
containerId = ConverterUtils.toContainerId($(CONTAINER_ID));
} catch (IOException e) {
} catch (IllegalArgumentException e) {
div.h1("Invalid containerId " + $(CONTAINER_ID))._();
return;
}

View File

@ -22,8 +22,6 @@
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
import java.io.IOException;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -66,7 +64,7 @@ protected void render(Block html) {
ContainerId containerID;
try {
containerID = ConverterUtils.toContainerId($(CONTAINER_ID));
} catch (IOException e) {
} catch (IllegalArgumentException e) {
html.p()._("Invalid containerId " + $(CONTAINER_ID))._();
return;
}

View File

@ -206,11 +206,12 @@ public SubmitApplicationResponse submitApplication(
// Safety
submissionContext.setUser(user);
// This needs to be synchronous as the client can query
// immediately following the submission to get the application status.
// So call handle directly and do not send an event.
rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext));
rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
.currentTimeMillis()));
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user + " with " + submissionContext);

View File

@ -213,7 +213,7 @@ protected synchronized void checkAppNumCompletedLimit() {
@SuppressWarnings("unchecked")
protected synchronized void submitApplication(
ApplicationSubmissionContext submissionContext) {
ApplicationSubmissionContext submissionContext, long submitTime) {
ApplicationId applicationId = submissionContext.getApplicationId();
RMApp application = null;
try {
@ -241,13 +241,13 @@ protected synchronized void submitApplication(
ApplicationStore appStore = rmContext.getApplicationsStore()
.createApplicationStore(submissionContext.getApplicationId(),
submissionContext);
// Create RMApp
application = new RMAppImpl(applicationId, rmContext,
this.conf, submissionContext.getApplicationName(), user,
submissionContext.getQueue(), submissionContext, clientTokenStr,
appStore, this.scheduler,
this.masterService);
this.masterService, submitTime);
if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
null) {
@ -284,8 +284,9 @@ public void handle(RMAppManagerEvent event) {
case APP_SUBMIT:
{
ApplicationSubmissionContext submissionContext =
((RMAppManagerSubmitEvent)event).getSubmissionContext();
submitApplication(submissionContext);
((RMAppManagerSubmitEvent)event).getSubmissionContext();
long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime();
submitApplication(submissionContext, submitTime);
}
break;
default:

View File

@ -23,13 +23,21 @@
public class RMAppManagerSubmitEvent extends RMAppManagerEvent {
private final ApplicationSubmissionContext submissionContext;
private final long submitTime;
public RMAppManagerSubmitEvent(ApplicationSubmissionContext submissionContext) {
super(submissionContext.getApplicationId(), RMAppManagerEventType.APP_SUBMIT);
public RMAppManagerSubmitEvent(
ApplicationSubmissionContext submissionContext, long submitTime) {
super(submissionContext.getApplicationId(),
RMAppManagerEventType.APP_SUBMIT);
this.submissionContext = submissionContext;
this.submitTime = submitTime;
}
public ApplicationSubmissionContext getSubmissionContext() {
return this.submissionContext;
}
public long getSubmitTime() {
return this.submitTime;
}
}

View File

@ -22,8 +22,6 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.crypto.SecretKey;
@ -37,7 +35,6 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
@ -58,7 +55,6 @@
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -83,6 +79,7 @@ public class AMLauncher implements Runnable {
private final ApplicationTokenSecretManager applicationTokenSecretManager;
private final ClientToAMSecretManager clientToAMSecretManager;
private final AMLauncherEventType eventType;
private final RMContext rmContext;
@SuppressWarnings("rawtypes")
private final EventHandler handler;
@ -96,6 +93,7 @@ public AMLauncher(RMContext rmContext, RMAppAttempt application,
this.applicationTokenSecretManager = applicationTokenSecretManager;
this.clientToAMSecretManager = clientToAMSecretManager;
this.eventType = eventType;
this.rmContext = rmContext;
this.handler = rmContext.getDispatcher().getEventHandler();
}
@ -189,9 +187,18 @@ private void setupTokensAndEnv(
throws IOException {
Map<String, String> environment = container.getEnvironment();
// Set the AppAttemptId to be consumable by the AM.
environment.put(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV,
application.getAppAttemptId().toString());
// Set the AppAttemptId, containerId, NMHTTPAdress, AppSubmitTime to be
// consumable by the AM.
environment.put(ApplicationConstants.AM_CONTAINER_ID_ENV, container
.getContainerId().toString());
environment.put(ApplicationConstants.NM_HTTP_ADDRESS_ENV, application
.getMasterContainer().getNodeHttpAddress());
environment.put(
ApplicationConstants.APP_SUBMIT_TIME_ENV,
String.valueOf(rmContext.getRMApps()
.get(application.getAppAttemptId().getApplicationId())
.getSubmitTime()));
if (UserGroupInformation.isSecurityEnabled()) {
// TODO: Security enabled/disabled info should come from RM.

View File

@ -115,6 +115,12 @@ public interface RMApp extends EventHandler<RMAppEvent> {
*/
long getStartTime();
/**
* the submit time of the application.
* @return the submit time of the application.
*/
long getSubmitTime();
/**
* The tracking url for the application master.
* @return the tracking url for the application master.

View File

@ -84,6 +84,7 @@ public class RMAppImpl implements RMApp {
private final WriteLock writeLock;
private final Map<ApplicationAttemptId, RMAppAttempt> attempts
= new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
private final long submitTime;
// Mutable fields
private long startTime;
@ -163,7 +164,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
Configuration config, String name, String user, String queue,
ApplicationSubmissionContext submissionContext, String clientTokenStr,
ApplicationStore appStore,
YarnScheduler scheduler, ApplicationMasterService masterService) {
YarnScheduler scheduler, ApplicationMasterService masterService,
long submitTime) {
this.applicationId = applicationId;
this.name = name;
@ -178,6 +180,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
this.appStore = appStore;
this.scheduler = scheduler;
this.masterService = masterService;
this.submitTime = submitTime;
this.startTime = System.currentTimeMillis();
this.maxRetries = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
@ -367,6 +370,11 @@ public long getStartTime() {
}
}
@Override
public long getSubmitTime() {
return this.submitTime;
}
@Override
public String getTrackingUrl() {
this.readLock.lock();

View File

@ -42,6 +42,7 @@ public class MockNM {
private final String nodeIdStr;
private final int memory;
private final ResourceTrackerService resourceTracker;
private final int httpPort = 2;
MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
this.nodeIdStr = nodeIdStr;
@ -53,6 +54,10 @@ public NodeId getNodeId() {
return nodeId;
}
public String getHttpAddress() {
return nodeId.getHost() + ":" + String.valueOf(httpPort);
}
public void containerStatus(Container container) throws Exception {
Map<ApplicationId, List<ContainerStatus>> conts =
new HashMap<ApplicationId, List<ContainerStatus>>();
@ -69,7 +74,7 @@ public NodeId registerNode() throws Exception {
RegisterNodeManagerRequest req = Records.newRecord(
RegisterNodeManagerRequest.class);
req.setNodeId(nodeId);
req.setHttpPort(2);
req.setHttpPort(httpPort);
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(memory);
req.setResource(resource);

View File

@ -163,7 +163,7 @@ public void setCompletedAppsMax(int max) {
}
public void submitApplication(
ApplicationSubmissionContext submissionContext) {
super.submitApplication(submissionContext);
super.submitApplication(submissionContext, System.currentTimeMillis());
}
}

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
@ -39,6 +40,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@ -56,6 +58,9 @@ private static final class MyContainerManagerImpl implements
boolean launched = false;
boolean cleanedup = false;
String attemptIdAtContainerManager = null;
String containerIdAtContainerManager = null;
String nmAddressAtContainerManager = null;
long submitTimeAtContainerManager;
@Override
public StartContainerResponse
@ -63,9 +68,20 @@ private static final class MyContainerManagerImpl implements
throws YarnRemoteException {
LOG.info("Container started by MyContainerManager: " + request);
launched = true;
attemptIdAtContainerManager = request.getContainerLaunchContext()
.getEnvironment().get(
ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV);
containerIdAtContainerManager =
request.getContainerLaunchContext().getEnvironment()
.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
ContainerId containerId =
ConverterUtils.toContainerId(containerIdAtContainerManager);
attemptIdAtContainerManager =
containerId.getApplicationAttemptId().toString();
nmAddressAtContainerManager =
request.getContainerLaunchContext().getEnvironment()
.get(ApplicationConstants.NM_HTTP_ADDRESS_ENV);
submitTimeAtContainerManager =
Long.parseLong(request.getContainerLaunchContext().getEnvironment()
.get(ApplicationConstants.APP_SUBMIT_TIME_ENV));
return null;
}
@ -140,6 +156,13 @@ public void testAMLaunchAndCleanup() throws Exception {
ApplicationAttemptId appAttemptId = attempt.getAppAttemptId();
Assert.assertEquals(appAttemptId.toString(),
containerManager.attemptIdAtContainerManager);
Assert.assertEquals(app.getSubmitTime(),
containerManager.submitTimeAtContainerManager);
Assert.assertEquals(app.getRMAppAttempt(appAttemptId)
.getSubmissionContext().getAMContainerSpec().getContainerId()
.toString(), containerManager.containerIdAtContainerManager);
Assert.assertEquals(nm1.getHttpAddress(),
containerManager.nmAddressAtContainerManager);
MockAM am = new MockAM(rm.getRMContext(), rm
.getApplicationMasterService(), appAttemptId);

View File

@ -166,6 +166,11 @@ public long getStartTime() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public long getSubmitTime() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public long getFinishTime() {
throw new UnsupportedOperationException("Not supported yet.");

View File

@ -33,6 +33,7 @@ public class MockRMApp implements RMApp {
String name = MockApps.newAppName();
String queue = MockApps.newQueue();
long start = System.currentTimeMillis() - (int) (Math.random() * DT);
long submit = start - (int) (Math.random() * DT);
long finish = 0;
RMAppState state = RMAppState.NEW;
int failCount = 0;
@ -141,6 +142,11 @@ public long getStartTime() {
return start;
}
@Override
public long getSubmitTime() {
return submit;
}
public void setStartTime(long time) {
this.start = time;
}

View File

@ -21,7 +21,6 @@
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.List;
import junit.framework.Assert;
@ -51,7 +50,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
@ -61,7 +59,7 @@ public class TestRMAppTransitions {
private RMContext rmContext;
private static int maxRetries = 4;
private static int appId = 1;
private AsyncDispatcher rmDispatcher;
// private AsyncDispatcher rmDispatcher;
// ignore all the RM application attempt events
private static final class TestApplicationAttemptEventDispatcher implements
@ -152,7 +150,7 @@ protected RMApp createNewTestApp() {
conf, name, user,
queue, submissionContext, clientTokenStr,
appStore, scheduler,
masterService);
masterService, System.currentTimeMillis());
testAppStartState(applicationId, user, name, queue, application);
return application;

View File

@ -323,25 +323,29 @@ Hadoop MapReduce Next Generation - Writing YARN Applications
multi-tenancy nature, amongst other issues, it cannot make any assumptions
of things like pre-configured ports that it can listen on.
* When the ApplicationMaster starts up, several parameters are made available
to it via the environment. These include the ContainerId for the
ApplicationMaster container, the application submission time and the HTTP
address of the NodeManager running the container. Ref ApplicationConstants
for parameter names.
* All interactions with the ResourceManager require an ApplicationAttemptId
(there can be multiple attempts per application in case of failures). When
the ApplicationMaster starts up, the ApplicationAttemptId associated with
this particular instance will be set in the environment. There are helper
apis to convert the value obtained from the environment into an
ApplicationAttemptId object.
(there can be multiple attempts per application in case of failures). The
ApplicationAttemptId can be obtained from the ApplicationMaster
containerId. There are helper apis to convert the value obtained from the
environment into objects.
+---+
Map<String, String> envs = System.getenv();
ApplicationAttemptId appAttemptID =
Records.newRecord(ApplicationAttemptId.class);
if (!envs.containsKey(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV)) {
// app attempt id should always be set in the env by the framework
String containerIdString =
envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
if (containerIdString == null) {
// container id should always be set in the env by the framework
throw new IllegalArgumentException(
"ApplicationAttemptId not set in the environment");
}
appAttemptID =
ConverterUtils.toApplicationAttemptId(
envs.get(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV));
"ContainerId not set in the environment");
}
ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
+---+
* After an ApplicationMaster has initialized itself completely, it needs to