MAPREDUCE-3144. Augmented JobHistory with the information needed for serving aggregated logs. Contributed by Siddharth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1185976 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-19 05:21:18 +00:00
parent 3e3abcb769
commit 13e4562924
45 changed files with 688 additions and 197 deletions

View File

@ -407,6 +407,9 @@ Release 0.23.0 - Unreleased
from the NodeManager and set MALLOC_ARENA_MAX for all daemons and
containers. (Chris Riccomini via acmurthy)
MAPREDUCE-3144. Augmented JobHistory with the information needed for
serving aggregated logs. (Siddharth Seth via vinodkv)
OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and

View File

@ -275,7 +275,7 @@ public class JobHistoryEventHandler extends AbstractService
* @param jobId the jobId.
* @throws IOException
*/
protected void setupEventWriter(JobId jobId, JobSubmittedEvent jse)
protected void setupEventWriter(JobId jobId)
throws IOException {
if (stagingDirPath == null) {
LOG.error("Log Directory is null, returning");
@ -285,9 +285,6 @@ public class JobHistoryEventHandler extends AbstractService
MetaInfo oldFi = fileMap.get(jobId);
Configuration conf = getConfig();
long submitTime = oldFi == null ? jse.getSubmitTime() : oldFi
.getJobIndexInfo().getSubmitTime();
// TODO Ideally this should be written out to the job dir
// (.staging/jobid/files - RecoveryService will need to be patched)
Path historyFile = JobHistoryUtils.getStagingJobHistoryFile(
@ -301,6 +298,8 @@ public class JobHistoryEventHandler extends AbstractService
String jobName = context.getJob(jobId).getName();
EventWriter writer = (oldFi == null) ? null : oldFi.writer;
Path logDirConfPath =
JobHistoryUtils.getStagingConfFile(stagingDirPath, jobId, startCount);
if (writer == null) {
try {
FSDataOutputStream out = stagingDirFS.create(historyFile, true);
@ -312,31 +311,28 @@ public class JobHistoryEventHandler extends AbstractService
+ "[" + jobName + "]");
throw ioe;
}
}
Path logDirConfPath = null;
if (conf != null) {
// TODO Ideally this should be written out to the job dir
// (.staging/jobid/files - RecoveryService will need to be patched)
logDirConfPath = JobHistoryUtils.getStagingConfFile(stagingDirPath, jobId,
startCount);
FSDataOutputStream jobFileOut = null;
try {
if (logDirConfPath != null) {
jobFileOut = stagingDirFS.create(logDirConfPath, true);
conf.writeXml(jobFileOut);
jobFileOut.close();
//Write out conf only if the writer isn't already setup.
if (conf != null) {
// TODO Ideally this should be written out to the job dir
// (.staging/jobid/files - RecoveryService will need to be patched)
FSDataOutputStream jobFileOut = null;
try {
if (logDirConfPath != null) {
jobFileOut = stagingDirFS.create(logDirConfPath, true);
conf.writeXml(jobFileOut);
jobFileOut.close();
}
} catch (IOException e) {
LOG.info("Failed to write the job configuration file", e);
throw e;
}
} catch (IOException e) {
LOG.info("Failed to write the job configuration file", e);
throw e;
}
}
MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer, submitTime,
MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer,
user, jobName, jobId);
fi.getJobSummary().setJobId(jobId);
fi.getJobSummary().setJobSubmitTime(submitTime);
fileMap.put(jobId, fi);
}
@ -368,11 +364,9 @@ public class JobHistoryEventHandler extends AbstractService
synchronized (lock) {
// If this is JobSubmitted Event, setup the writer
if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
if (event.getHistoryEvent().getEventType() == EventType.AM_STARTED) {
try {
JobSubmittedEvent jobSubmittedEvent =
(JobSubmittedEvent) event.getHistoryEvent();
setupEventWriter(event.getJobID(), jobSubmittedEvent);
setupEventWriter(event.getJobID());
} catch (IOException ioe) {
LOG.error("Error JobHistoryEventHandler in handleEvent: " + event,
ioe);
@ -396,6 +390,12 @@ public class JobHistoryEventHandler extends AbstractService
throw new YarnException(e);
}
if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
JobSubmittedEvent jobSubmittedEvent =
(JobSubmittedEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime());
}
// If this is JobFinishedEvent, close the writer and setup the job-index
if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) {
try {
@ -436,6 +436,7 @@ public class JobHistoryEventHandler extends AbstractService
JobSubmittedEvent jse = (JobSubmittedEvent) event;
summary.setUser(jse.getUserName());
summary.setQueue(jse.getJobQueueName());
summary.setJobSubmitTime(jse.getSubmitTime());
break;
case JOB_INITED:
JobInitedEvent jie = (JobInitedEvent) event;
@ -588,12 +589,12 @@ public class JobHistoryEventHandler extends AbstractService
JobIndexInfo jobIndexInfo;
JobSummary jobSummary;
MetaInfo(Path historyFile, Path conf, EventWriter writer, long submitTime,
MetaInfo(Path historyFile, Path conf, EventWriter writer,
String user, String jobName, JobId jobId) {
this.historyFile = historyFile;
this.confFile = conf;
this.writer = writer;
this.jobIndexInfo = new JobIndexInfo(submitTime, -1, user, jobName, jobId, -1, -1, null);
this.jobIndexInfo = new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null);
this.jobSummary = new JobSummary();
}

View File

@ -22,7 +22,10 @@ import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -40,6 +43,8 @@ import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
@ -72,6 +77,7 @@ import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@ -82,6 +88,7 @@ import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -115,14 +122,20 @@ public class MRAppMaster extends CompositeService {
private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
private Clock clock;
private final long startTime = System.currentTimeMillis();
private final long startTime;
private final long appSubmitTime;
private String appName;
private final ApplicationAttemptId appAttemptID;
private final ContainerId containerID;
private final String nmHost;
private final int nmHttpPort;
protected final MRAppMetrics metrics;
private Set<TaskId> completedTasksFromPreviousRun;
private List<AMInfo> amInfos;
private AppContext context;
private Dispatcher dispatcher;
private ClientService clientService;
private Recovery recoveryServ;
private ContainerAllocator containerAllocator;
private ContainerLauncher containerLauncher;
private TaskCleaner taskCleaner;
@ -131,19 +144,29 @@ public class MRAppMaster extends CompositeService {
private JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
private JobEventDispatcher jobEventDispatcher;
private boolean inRecovery = false;
private Job job;
private Credentials fsTokens = new Credentials(); // Filled during init
private UserGroupInformation currentUser; // Will be setup during init
public MRAppMaster(ApplicationAttemptId applicationAttemptId) {
this(applicationAttemptId, new SystemClock());
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmHttpPort, long appSubmitTime) {
this(applicationAttemptId, containerId, nmHost, nmHttpPort,
new SystemClock(), appSubmitTime);
}
public MRAppMaster(ApplicationAttemptId applicationAttemptId, Clock clock) {
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmHttpPort, Clock clock,
long appSubmitTime) {
super(MRAppMaster.class.getName());
this.clock = clock;
this.startTime = clock.getTime();
this.appSubmitTime = appSubmitTime;
this.appAttemptID = applicationAttemptId;
this.containerID = containerId;
this.nmHost = nmHost;
this.nmHttpPort = nmHttpPort;
this.metrics = MRAppMetrics.create();
LOG.info("Created MRAppMaster for application " + applicationAttemptId);
}
@ -162,11 +185,11 @@ public class MRAppMaster extends CompositeService {
if (conf.getBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false)
&& appAttemptID.getAttemptId() > 1) {
LOG.info("Recovery is enabled. Will try to recover from previous life.");
Recovery recoveryServ = new RecoveryService(appAttemptID, clock);
recoveryServ = new RecoveryService(appAttemptID, clock);
addIfService(recoveryServ);
dispatcher = recoveryServ.getDispatcher();
clock = recoveryServ.getClock();
completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
inRecovery = true;
} else {
dispatcher = new AsyncDispatcher();
addIfService(dispatcher);
@ -327,7 +350,8 @@ public class MRAppMaster extends CompositeService {
// create single job
Job newJob = new JobImpl(appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
completedTasksFromPreviousRun, metrics, currentUser.getUserName());
completedTasksFromPreviousRun, metrics, currentUser.getUserName(),
appSubmitTime, amInfos);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
@ -463,6 +487,10 @@ public class MRAppMaster extends CompositeService {
return completedTasksFromPreviousRun;
}
public List<AMInfo> getAllAMInfos() {
return amInfos;
}
public ContainerAllocator getContainerAllocator() {
return containerAllocator;
}
@ -617,11 +645,33 @@ public class MRAppMaster extends CompositeService {
@Override
public void start() {
///////////////////// Create the job itself.
// Pull completedTasks etc from recovery
if (inRecovery) {
completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
amInfos = recoveryServ.getAMInfos();
}
// / Create the AMInfo for the current AppMaster
if (amInfos == null) {
amInfos = new LinkedList<AMInfo>();
}
AMInfo amInfo =
new AMInfo(appAttemptID, startTime, containerID, nmHost, nmHttpPort);
amInfos.add(amInfo);
// /////////////////// Create the job itself.
job = createJob(getConfig());
// End of creating the job.
// Send out an MR AM inited event for this AM and all previous AMs.
for (AMInfo info : amInfos) {
dispatcher.getEventHandler().handle(
new JobHistoryEvent(job.getID(), new AMStartedEvent(info
.getAppAttemptId(), info.getStartTime(), info.getContainerId(),
info.getNodeManagerHost(), info.getNodeManagerHttpPort())));
}
// metrics system init is really init & start.
// It's more test friendly to put it here.
DefaultMetricsSystem.initialize("MRAppMaster");
@ -723,17 +773,39 @@ public class MRAppMaster extends CompositeService {
public static void main(String[] args) {
try {
String applicationAttemptIdStr = System
.getenv(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV);
if (applicationAttemptIdStr == null) {
String msg = ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV
+ " is null";
String containerIdStr =
System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
String nodeHttpAddressStr =
System.getenv(ApplicationConstants.NM_HTTP_ADDRESS_ENV);
String appSubmitTimeStr =
System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
if (containerIdStr == null) {
String msg = ApplicationConstants.AM_CONTAINER_ID_ENV + " is null";
LOG.error(msg);
throw new IOException(msg);
}
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
MRAppMaster appMaster = new MRAppMaster(applicationAttemptId);
if (nodeHttpAddressStr == null) {
String msg = ApplicationConstants.NM_HTTP_ADDRESS_ENV + " is null";
LOG.error(msg);
throw new IOException(msg);
}
if (appSubmitTimeStr == null) {
String msg = ApplicationConstants.APP_SUBMIT_TIME_ENV + " is null";
LOG.error(msg);
throw new IOException(msg);
}
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationAttemptId applicationAttemptId =
containerId.getApplicationAttemptId();
InetSocketAddress nodeHttpInetAddr =
NetUtils.createSocketAddr(nodeHttpAddressStr);
long appSubmitTime = Long.parseLong(appSubmitTimeStr);
MRAppMaster appMaster =
new MRAppMaster(applicationAttemptId, containerId,
nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(),
appSubmitTime);
Runtime.getRuntime().addShutdownHook(
new CompositeServiceShutdownHook(appMaster));
YarnConfiguration conf = new YarnConfiguration(new JobConf());

View File

@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@ -68,5 +69,10 @@ public interface Job {
TaskAttemptCompletionEvent[]
getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
/**
* @return information for MR AppMasters (previously failed and current)
*/
List<AMInfo> getAMInfos();
boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation);
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
@ -136,6 +137,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private final String username;
private final Map<JobACL, AccessControlList> jobACLs;
private final Set<TaskId> completedTasksFromPreviousRun;
private final List<AMInfo> amInfos;
private final Lock readLock;
private final Lock writeLock;
private final JobId jobId;
@ -148,6 +150,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private final EventHandler eventHandler;
private final MRAppMetrics metrics;
private final String userName;
private final long appSubmitTime;
private boolean lazyTasksCopyNeeded = false;
private volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
@ -354,7 +357,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private int failedReduceTaskCount = 0;
private int killedMapTaskCount = 0;
private int killedReduceTaskCount = 0;
private long submitTime;
private long startTime;
private long finishTime;
private float setupProgress;
@ -370,7 +372,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
JobTokenSecretManager jobTokenSecretManager,
Credentials fsTokenCredentials, Clock clock,
Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics,
String userName) {
String userName, long appSubmitTime, List<AMInfo> amInfos) {
this.applicationAttemptId = applicationAttemptId;
this.jobId = recordFactory.newRecordInstance(JobId.class);
this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
@ -378,7 +380,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
this.metrics = metrics;
this.clock = clock;
this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
this.amInfos = amInfos;
this.userName = userName;
this.appSubmitTime = appSubmitTime;
ApplicationId applicationId = applicationAttemptId.getApplicationId();
jobId.setAppId(applicationId);
jobId.setId(applicationId.getId());
@ -806,6 +810,11 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
public Map<JobACL, AccessControlList> getJobACLs() {
return Collections.unmodifiableMap(jobACLs);
}
@Override
public List<AMInfo> getAMInfos() {
return amInfos;
}
public static class InitTransition
implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
@ -819,7 +828,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
*/
@Override
public JobState transition(JobImpl job, JobEvent event) {
job.submitTime = job.clock.getTime();
job.metrics.submittedJob(job);
job.metrics.preparingJob(job);
try {
@ -830,7 +838,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
job.conf.get(MRJobConfig.JOB_NAME, "test"),
job.conf.get(MRJobConfig.USER_NAME, "mapred"),
job.submitTime,
job.appSubmitTime,
job.remoteJobConfFile.toString(),
job.jobACLs, job.conf.get(MRJobConfig.QUEUE_NAME, "default"));
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
@ -1152,7 +1160,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
job.isUber()); //Will transition to state running. Currently in INITED
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
job.submitTime, job.startTime);
job.appSubmitTime, job.startTime);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
job.metrics.runningJob(job);

View File

@ -894,15 +894,20 @@ public abstract class TaskAttemptImpl implements
return jce;
}
private static TaskAttemptUnsuccessfulCompletionEvent createTaskAttemptUnsuccessfulCompletionEvent(
TaskAttemptImpl taskAttempt, TaskAttemptState attemptState) {
TaskAttemptUnsuccessfulCompletionEvent tauce = new TaskAttemptUnsuccessfulCompletionEvent(
TypeConverter.fromYarn(taskAttempt.attemptId),
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
attemptState.toString(), taskAttempt.finishTime,
taskAttempt.nodeHostName == null ? "UNKNOWN" : taskAttempt.nodeHostName,
StringUtils.join(LINE_SEPARATOR, taskAttempt.getDiagnostics()),
taskAttempt.getProgressSplitBlock().burst());
private static
TaskAttemptUnsuccessfulCompletionEvent
createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt,
TaskAttemptState attemptState) {
TaskAttemptUnsuccessfulCompletionEvent tauce =
new TaskAttemptUnsuccessfulCompletionEvent(
TypeConverter.fromYarn(taskAttempt.attemptId),
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId()
.getTaskType()), attemptState.toString(),
taskAttempt.finishTime,
taskAttempt.containerMgrAddress == null ? "UNKNOWN"
: taskAttempt.containerMgrAddress, StringUtils.join(
LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt
.getProgressSplitBlock().burst());
return tauce;
}
@ -1120,11 +1125,15 @@ public abstract class TaskAttemptImpl implements
, 1);
taskAttempt.eventHandler.handle(jce);
LOG.info("TaskAttempt: [" + taskAttempt.attemptId
+ "] using containerId: [" + taskAttempt.containerID + " on NM: ["
+ taskAttempt.containerMgrAddress + "]");
TaskAttemptStartedEvent tase =
new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
taskAttempt.launchTime,
nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(), taskAttempt.shufflePort);
nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(),
taskAttempt.shufflePort, taskAttempt.containerID);
taskAttempt.eventHandler.handle
(new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
taskAttempt.eventHandler.handle
@ -1236,7 +1245,8 @@ public abstract class TaskAttemptImpl implements
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
state.toString(),
this.reportedStatus.mapFinishTime,
finishTime, this.nodeHostName == null ? "UNKNOWN" : this.nodeHostName,
finishTime, this.containerMgrAddress == null ? "UNKNOWN"
: this.containerMgrAddress,
this.reportedStatus.stateString,
TypeConverter.fromYarn(getCounters()),
getProgressSplitBlock().burst());
@ -1249,7 +1259,8 @@ public abstract class TaskAttemptImpl implements
state.toString(),
this.reportedStatus.shuffleFinishTime,
this.reportedStatus.sortFinishTime,
finishTime, this.nodeHostName == null ? "UNKNOWN" : this.nodeHostName,
finishTime, this.containerMgrAddress == null ? "UNKNOWN"
: this.containerMgrAddress,
this.reportedStatus.stateString,
TypeConverter.fromYarn(getCounters()),
getProgressSplitBlock().burst());

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.mapreduce.v2.app.recover;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -31,4 +33,6 @@ public interface Recovery {
Clock getClock();
Set<TaskId> getCompletedTasks();
List<AMInfo> getAMInfos();
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app.recover;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -34,6 +35,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
@ -148,6 +150,14 @@ public class RecoveryService extends CompositeService implements Recovery {
public Set<TaskId> getCompletedTasks() {
return completedTasks.keySet();
}
@Override
public List<AMInfo> getAMInfos() {
if (jobInfo == null || jobInfo.getAMInfos() == null) {
return new LinkedList<AMInfo>();
}
return new LinkedList<AMInfo>(jobInfo.getAMInfos());
}
private void parse() throws IOException {
// TODO: parse history file based on startCount
@ -351,15 +361,16 @@ public class RecoveryService extends CompositeService implements Recovery {
private void sendAssignedEvent(TaskAttemptId yarnAttemptID,
TaskAttemptInfo attemptInfo) {
LOG.info("Sending assigned event to " + yarnAttemptID);
ContainerId cId = recordFactory
.newRecordInstance(ContainerId.class);
ContainerId cId = attemptInfo.getContainerId();
Container container = recordFactory
.newRecordInstance(Container.class);
container.setId(cId);
container.setNodeId(recordFactory
.newRecordInstance(NodeId.class));
// NodeId can be obtained from TaskAttemptInfo.hostname - but this will
// eventually contain rack info.
container.setContainerToken(null);
container.setNodeHttpAddress(attemptInfo.getHostname() + ":" +
container.setNodeHttpAddress(attemptInfo.getTrackerName() + ":" +
attemptInfo.getHttpPort());
actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID,
container));

View File

@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.BuilderUtils;
/**
@ -118,10 +119,20 @@ public class MRApp extends MRAppMaster {
applicationAttemptId.setAttemptId(startCount);
return applicationAttemptId;
}
private static ContainerId getContainerId(ApplicationId applicationId,
int startCount) {
ApplicationAttemptId appAttemptId =
getApplicationAttemptId(applicationId, startCount);
ContainerId containerId =
BuilderUtils.newContainerId(appAttemptId, startCount);
return containerId;
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) {
super(getApplicationAttemptId(applicationId, startCount));
super(getApplicationAttemptId(applicationId, startCount), getContainerId(
applicationId, startCount), "testhost", 3333, System.currentTimeMillis());
this.testWorkDir = new File("target", testName);
testAbsPath = new Path(testWorkDir.getAbsolutePath());
LOG.info("PathUsed: " + testAbsPath);
@ -405,10 +416,10 @@ public class MRApp extends MRAppMaster {
public TestJob(Configuration conf, ApplicationId applicationId,
EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
Clock clock, String user) {
super(getApplicationAttemptId(applicationId, getStartCount()),
conf, eventHandler, taskAttemptListener,
new JobTokenSecretManager(), new Credentials(), clock,
getCompletedTaskFromPreviousRun(), metrics, user);
super(getApplicationAttemptId(applicationId, getStartCount()), conf,
eventHandler, taskAttemptListener, new JobTokenSecretManager(),
new Credentials(), clock, getCompletedTaskFromPreviousRun(), metrics,
user, System.currentTimeMillis(), getAllAMInfos());
// This "this leak" is okay because the retained pointer is in an
// instance variable.

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.FileSystemCounter;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@ -488,6 +489,11 @@ public class MockJobs extends MockApps {
public Map<JobACL, AccessControlList> getJobACLs() {
return Collections.<JobACL, AccessControlList>emptyMap();
}
@Override
public List<AMInfo> getAMInfos() {
throw new UnsupportedOperationException("Not supported yet.");
}
};
}
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
@ -36,11 +37,15 @@ public class TestMRAppMaster {
public void testMRAppMasterForDifferentUser() throws IOException,
InterruptedException {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000001";
String containerIdStr = "container_1317529182569_0004_000001_1";
String stagingDir = "/tmp/staging";
String userName = "TestAppMasterUser";
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
MRAppMasterTest appMaster = new MRAppMasterTest(applicationAttemptId);
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMasterTest appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1,
System.currentTimeMillis());
YarnConfiguration conf = new YarnConfiguration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@ -54,8 +59,9 @@ class MRAppMasterTest extends MRAppMaster {
Path stagingDirPath;
private Configuration conf;
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId) {
super(applicationAttemptId);
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, long submitTime) {
super(applicationAttemptId, containerId, host, port, submitTime);
}
@Override

View File

@ -340,8 +340,8 @@ public class TestRMContainerAllocator {
public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf,
int numMaps, int numReduces) {
super(appAttemptID, conf, null, null, null, null, null, null, null,
null);
super(appAttemptID, conf, null, null, null, null, null, null, null, null,
System.currentTimeMillis(), null);
this.jobId = MRBuilderUtils
.newJobId(appAttemptID.getApplicationId(), 0);
this.numMaps = numMaps;

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
@ -46,6 +47,7 @@ public class TestRecovery {
@Test
public void testCrashed() throws Exception {
int runCount = 0;
long am1StartTimeEst = System.currentTimeMillis();
MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
@ -126,9 +128,10 @@ public class TestRecovery {
//stop the app
app.stop();
//rerun
//in rerun the 1st map will be recovered from previous run
long am2StartTimeEst = System.currentTimeMillis();
app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
@ -178,8 +181,27 @@ public class TestRecovery {
task1StartTime, mapTask1.getReport().getStartTime());
Assert.assertEquals("Task Finish time not correct",
task1FinishTime, mapTask1.getReport().getFinishTime());
Assert.assertEquals(2, job.getAMInfos().size());
int attemptNum = 1;
// Verify AMInfo
for (AMInfo amInfo : job.getAMInfos()) {
Assert.assertEquals(attemptNum++, amInfo.getAppAttemptId()
.getAttemptId());
Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
.getApplicationAttemptId());
Assert.assertEquals("testhost", amInfo.getNodeManagerHost());
Assert.assertEquals(3333, amInfo.getNodeManagerHttpPort());
}
long am1StartTimeReal = job.getAMInfos().get(0).getStartTime();
long am2StartTimeReal = job.getAMInfos().get(1).getStartTime();
Assert.assertTrue(am1StartTimeReal >= am1StartTimeEst
&& am1StartTimeReal <= am2StartTimeEst);
Assert.assertTrue(am2StartTimeReal >= am2StartTimeEst
&& am2StartTimeReal <= System.currentTimeMillis());
// TODO Add verification of additional data from jobHistory - whatever was
// available in the failed attempt should be available here
}
class MRAppWithHistory extends MRApp {
public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart, int startCount) {

View File

@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@ -473,6 +474,11 @@ public class TestRuntimeEstimators {
public Map<JobACL, AccessControlList> getJobACLs() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public List<AMInfo> getAMInfos() {
throw new UnsupportedOperationException("Not supported yet.");
}
}
/*

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
@ -81,7 +82,9 @@ import org.junit.Test;
private class TestMRApp extends MRAppMaster {
public TestMRApp(ApplicationAttemptId applicationAttemptId) {
super(applicationAttemptId);
super(applicationAttemptId, BuilderUtils.newContainerId(
applicationAttemptId, 1), "testhost", 3333, System
.currentTimeMillis());
}
@Override

View File

@ -69,6 +69,16 @@
]
},
{"type": "record", "name": "AMStarted",
"fields": [
{"name": "applicationAttemptId", "type": "string"},
{"name": "startTime", "type": "long"},
{"name": "containerId", "type": "string"},
{"name": "nodeManagerHost", "type": "string"},
{"name": "nodeManagerHttpPort", "type": "int"}
]
},
{"type": "record", "name": "JobSubmitted",
"fields": [
{"name": "jobid", "type": "string"},
@ -174,7 +184,8 @@
{"name": "startTime", "type": "long"},
{"name": "trackerName", "type": "string"},
{"name": "httpPort", "type": "int"},
{"name": "shufflePort", "type": "int"}
{"name": "shufflePort", "type": "int"},
{"name": "containerId", "type": "string"}
]
},
@ -260,7 +271,8 @@
"CLEANUP_ATTEMPT_STARTED",
"CLEANUP_ATTEMPT_FINISHED",
"CLEANUP_ATTEMPT_FAILED",
"CLEANUP_ATTEMPT_KILLED"
"CLEANUP_ATTEMPT_KILLED",
"AM_STARTED"
]
},
@ -272,6 +284,7 @@
"JobFinished",
"JobInfoChange",
"JobInited",
"AMStarted",
"JobPriorityChange",
"JobStatusChanged",
"JobSubmitted",

View File

@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.avro.util.Utf8;
/**
* Event to record start of a task attempt
*
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class AMStartedEvent implements HistoryEvent {
private AMStarted datum = new AMStarted();
/**
* Create an event to record the start of an MR AppMaster
*
* @param appAttemptId
* the application attempt id.
* @param startTime
* the start time of the AM.
* @param containerId
* the containerId of the AM.
* @param nodeManagerHost
* the node on which the AM is running.
* @param nodeManagerHttpPort
* the httpPort for the node running the AM.
*/
public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
ContainerId containerId, String nodeManagerHost, int nodeManagerHttpPort) {
datum.applicationAttemptId = new Utf8(appAttemptId.toString());
datum.startTime = startTime;
datum.containerId = new Utf8(containerId.toString());
datum.nodeManagerHost = new Utf8(nodeManagerHost);
datum.nodeManagerHttpPort = nodeManagerHttpPort;
}
AMStartedEvent() {
}
public Object getDatum() {
return datum;
}
public void setDatum(Object datum) {
this.datum = (AMStarted) datum;
}
/**
* @return the ApplicationAttemptId
*/
public ApplicationAttemptId getAppAttemptId() {
return ConverterUtils.toApplicationAttemptId(datum.applicationAttemptId
.toString());
}
/**
* @return the start time for the MRAppMaster
*/
public long getStartTime() {
return datum.startTime;
}
/**
* @return the ContainerId for the MRAppMaster.
*/
public ContainerId getContainerId() {
return ConverterUtils.toContainerId(datum.containerId.toString());
}
/**
* @return the node manager host.
*/
public String getNodeManagerHost() {
return datum.nodeManagerHost.toString();
}
/**
* @return the http port for the tracker.
*/
public int getNodeManagerHttpPort() {
return datum.nodeManagerHttpPort;
}
/** Get the attempt id */
@Override
public EventType getEventType() {
return EventType.AM_STARTED;
}
}

View File

@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.Counters;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
@ -146,8 +145,10 @@ public class EventReader implements Closeable {
result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
case CLEANUP_ATTEMPT_KILLED:
result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
case AM_STARTED:
result = new AMStartedEvent(); break;
default:
throw new RuntimeException("unexpected event type!");
throw new RuntimeException("unexpected event type: " + wrapper.type);
}
result.setDatum(wrapper.event);
return result;

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
@ -37,6 +39,8 @@ import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
/**
* Default Parser for the JobHistory files. Typical usage is
@ -174,6 +178,9 @@ public class JobHistoryParser {
case CLEANUP_ATTEMPT_FINISHED:
handleTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
break;
case AM_STARTED:
handleAMStartedEvent((AMStartedEvent) event);
break;
default:
break;
}
@ -241,6 +248,7 @@ public class JobHistoryParser {
attemptInfo.trackerName = event.getTrackerName();
attemptInfo.taskType = event.getTaskType();
attemptInfo.shufflePort = event.getShufflePort();
attemptInfo.containerId = event.getContainerId();
taskInfo.attemptsMap.put(attemptId, attemptInfo);
}
@ -305,6 +313,20 @@ public class JobHistoryParser {
info.totalReduces = event.getTotalReduces();
info.uberized = event.getUberized();
}
private void handleAMStartedEvent(AMStartedEvent event) {
AMInfo amInfo = new AMInfo();
amInfo.appAttemptId = event.getAppAttemptId();
amInfo.startTime = event.getStartTime();
amInfo.containerId = event.getContainerId();
amInfo.nodeManagerHost = event.getNodeManagerHost();
amInfo.nodeManagerHttpPort = event.getNodeManagerHttpPort();
if (info.amInfos == null) {
info.amInfos = new LinkedList<AMInfo>();
}
info.amInfos.add(amInfo);
info.latestAmInfo = amInfo;
}
private void handleJobInfoChangeEvent(JobInfoChangeEvent event) {
info.submitTime = event.getSubmitTime();
@ -348,6 +370,8 @@ public class JobHistoryParser {
Map<JobACL, AccessControlList> jobACLs;
Map<TaskID, TaskInfo> tasksMap;
List<AMInfo> amInfos;
AMInfo latestAmInfo;
boolean uberized;
/** Create a job info object where job information will be stored
@ -377,7 +401,9 @@ public class JobHistoryParser {
System.out.println("REDUCE_COUNTERS:" + reduceCounters.toString());
System.out.println("TOTAL_COUNTERS: " + totalCounters.toString());
System.out.println("UBERIZED: " + uberized);
for (AMInfo amInfo : amInfos) {
amInfo.printAll();
}
for (TaskInfo ti: tasksMap.values()) {
ti.printAll();
}
@ -427,6 +453,10 @@ public class JobHistoryParser {
public Map<JobACL, AccessControlList> getJobACLs() { return jobACLs; }
/** @return the uberized status of this job */
public boolean getUberized() { return uberized; }
/** @return the AMInfo for the job's AppMaster */
public List<AMInfo> getAMInfos() { return amInfos; }
/** @return the AMInfo for the newest AppMaster */
public AMInfo getLatestAMInfo() { return latestAmInfo; };
}
/**
@ -509,6 +539,7 @@ public class JobHistoryParser {
int httpPort;
int shufflePort;
String hostname;
ContainerId containerId;
/** Create a Task Attempt Info which will store attempt level information
* on a history parse.
@ -534,6 +565,7 @@ public class JobHistoryParser {
System.out.println("TRACKER_NAME:" + trackerName);
System.out.println("HTTP_PORT:" + httpPort);
System.out.println("SHUFFLE_PORT:" + shufflePort);
System.out.println("CONTIANER_ID:" + containerId);
if (counters != null) {
System.out.println("COUNTERS:" + counters.toString());
}
@ -569,5 +601,74 @@ public class JobHistoryParser {
public int getHttpPort() { return httpPort; }
/** @return the Shuffle port for the tracker */
public int getShufflePort() { return shufflePort; }
/** @return the ContainerId for the tracker */
public ContainerId getContainerId() { return containerId; }
}
/**
* Stores AM information
*/
public static class AMInfo {
ApplicationAttemptId appAttemptId;
long startTime;
ContainerId containerId;
String nodeManagerHost;
int nodeManagerHttpPort;
/**
* Create a AM Info which will store AM level information on a history
* parse.
*/
public AMInfo() {
startTime = -1;
nodeManagerHost = "";
nodeManagerHttpPort = -1;
}
public AMInfo(ApplicationAttemptId appAttemptId, long startTime,
ContainerId containerId, String nodeManagerHost, int nodeManagerHttpPort) {
this.appAttemptId = appAttemptId;
this.startTime = startTime;
this.containerId = containerId;
this.nodeManagerHost = nodeManagerHost;
this.nodeManagerHttpPort = nodeManagerHttpPort;
}
/**
* Print all the information about this AM.
*/
public void printAll() {
System.out.println("APPLICATION_ATTEMPT_ID:" + appAttemptId.toString());
System.out.println("START_TIME: " + startTime);
System.out.println("CONTAINER_ID: " + containerId.toString());
System.out.println("NODE_MANAGER_HOST: " + nodeManagerHost);
System.out.println("NODE_MANAGER_HTTP_PORT: " + nodeManagerHttpPort);
}
/** @return the ApplicationAttemptId */
public ApplicationAttemptId getAppAttemptId() {
return appAttemptId;
}
/** @return the start time of the AM */
public long getStartTime() {
return startTime;
}
/** @return the container id for the AM */
public ContainerId getContainerId() {
return containerId;
}
/** @return the host name for the node manager on which the AM is running */
public String getNodeManagerHost() {
return nodeManagerHost;
}
/** @return the http port for the node manager running the AM */
public int getNodeManagerHttpPort() {
return nodeManagerHttpPort;
}
}
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobID;

View File

@ -18,13 +18,13 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.avro.util.Utf8;
@ -45,10 +45,11 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
* @param trackerName Name of the Task Tracker where attempt is running
* @param httpPort The port number of the tracker
* @param shufflePort The shuffle port number of the container
* @param containerId The containerId for the task attempt.
*/
public TaskAttemptStartedEvent( TaskAttemptID attemptId,
TaskType taskType, long startTime, String trackerName,
int httpPort, int shufflePort) {
int httpPort, int shufflePort, ContainerId containerId) {
datum.attemptId = new Utf8(attemptId.toString());
datum.taskid = new Utf8(attemptId.getTaskID().toString());
datum.startTime = startTime;
@ -56,6 +57,15 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
datum.trackerName = new Utf8(trackerName);
datum.httpPort = httpPort;
datum.shufflePort = shufflePort;
datum.containerId = new Utf8(containerId.toString());
}
// TODO Remove after MrV1 is removed.
// Using a dummy containerId to prevent jobHistory parse failures.
public TaskAttemptStartedEvent(TaskAttemptID attemptId, TaskType taskType,
long startTime, String trackerName, int httpPort, int shufflePort) {
this(attemptId, taskType, startTime, trackerName, httpPort, shufflePort,
ConverterUtils.toContainerId("container_-1_-1_-1_-1"));
}
TaskAttemptStartedEvent() {}
@ -91,5 +101,8 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
? EventType.MAP_ATTEMPT_STARTED
: EventType.REDUCE_ATTEMPT_STARTED;
}
/** Get the ContainerId */
public ContainerId getContainerId() {
return ConverterUtils.toContainerId(datum.containerId.toString());
}
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -337,4 +338,9 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
public Path getConfFile() {
return confFile;
}
@Override
public List<AMInfo> getAMInfos() {
return jobInfo.getAMInfos();
}
}

View File

@ -29,8 +29,6 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -85,35 +83,17 @@ public class CompletedTaskAttempt implements TaskAttempt {
@Override
public ContainerId getAssignedContainerID() {
//TODO ContainerId needs to be part of some historyEvent to be able to
//render the log directory.
ContainerId containerId =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
ContainerId.class);
containerId.setId(-1);
ApplicationAttemptId applicationAttemptId =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
ApplicationAttemptId.class);
applicationAttemptId.setAttemptId(-1);
ApplicationId applicationId =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
ApplicationId.class);
applicationId.setClusterTimestamp(-1);
applicationId.setId(-1);
applicationAttemptId.setApplicationId(applicationId);
containerId.setApplicationAttemptId(applicationAttemptId);
return containerId;
return attemptInfo.getContainerId();
}
@Override
public String getAssignedContainerMgrAddress() {
// TODO Verify this is correct.
return attemptInfo.getTrackerName();
return attemptInfo.getHostname();
}
@Override
public String getNodeHttpAddress() {
return attemptInfo.getHostname() + ":" + attemptInfo.getHttpPort();
return attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort();
}
@Override

View File

@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@ -159,4 +160,9 @@ public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
throw new IllegalStateException("Not implemented yet");
}
@Override
public List<AMInfo> getAMInfos() {
return null;
}
}

View File

@ -41,8 +41,10 @@ import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
public class TestJobHistoryEvents {
@ -159,6 +161,10 @@ public class TestJobHistoryEvents {
private void verifyAttempt(TaskAttempt attempt) {
Assert.assertEquals("TaskAttempt state not currect",
TaskAttemptState.SUCCEEDED, attempt.getState());
Assert.assertNotNull(attempt.getAssignedContainerID());
//Verify the wrong ctor is not being used. Remove after mrv1 is removed.
ContainerId fakeCid = BuilderUtils.newContainerId(-1, -1, -1, -1);
Assert.assertFalse(attempt.getAssignedContainerID().equals(fakeCid));
}
static class MRAppWithHistory extends MRApp {

View File

@ -34,8 +34,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
@ -46,7 +47,9 @@ import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
public class TestJobHistoryParsing {
@ -54,6 +57,7 @@ public class TestJobHistoryParsing {
@Test
public void testHistoryParsing() throws Exception {
Configuration conf = new Configuration();
long amStartTimeEst = System.currentTimeMillis();
MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
@ -102,12 +106,30 @@ public class TestJobHistoryParsing {
job.isUber(), jobInfo.getUberized());
int totalTasks = jobInfo.getAllTasks().size();
Assert.assertEquals("total number of tasks is incorrect ", 3, totalTasks);
// Verify aminfo
Assert.assertEquals(1, jobInfo.getAMInfos().size());
Assert.assertEquals("testhost", jobInfo.getAMInfos().get(0)
.getNodeManagerHost());
AMInfo amInfo = jobInfo.getAMInfos().get(0);
Assert.assertEquals(3333, amInfo.getNodeManagerHttpPort());
Assert.assertEquals(1, amInfo.getAppAttemptId().getAttemptId());
Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
.getApplicationAttemptId());
Assert.assertTrue(amInfo.getStartTime() <= System.currentTimeMillis()
&& amInfo.getStartTime() >= amStartTimeEst);
ContainerId fakeCid = BuilderUtils.newContainerId(-1, -1, -1, -1);
//Assert at taskAttempt level
for (TaskInfo taskInfo : jobInfo.getAllTasks().values()) {
int taskAttemptCount = taskInfo.getAllTaskAttempts().size();
Assert.assertEquals("total number of task attempts ",
1, taskAttemptCount);
TaskAttemptInfo taInfo =
taskInfo.getAllTaskAttempts().values().iterator().next();
Assert.assertNotNull(taInfo.getContainerId());
//Verify the wrong ctor is not being used. Remove after mrv1 is removed.
Assert.assertFalse(taInfo.getContainerId().equals(fakeCid));
}
// Deep compare Job and JobInfo

View File

@ -36,12 +36,24 @@ public interface ApplicationConstants {
// TODO: They say tokens via env isn't good.
public static final String APPLICATION_CLIENT_SECRET_ENV_NAME =
"AppClientTokenEnv";
/**
* The environmental variable for APPLICATION_ATTEMPT_ID. Set in
* ApplicationMaster's environment only.
* The environment variable for CONTAINER_ID. Set in AppMaster environment
* only
*/
public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID";
public static final String AM_CONTAINER_ID_ENV = "AM_CONTAINER_ID";
/**
* The environment variable for NM_HTTP_ADDRESS. Set in AppMaster environment
* only
*/
public static final String NM_HTTP_ADDRESS_ENV = "NM_HTTP_ADDRESS";
/**
* The environment variable for APP_SUBMIT_TIME. Set in AppMaster environment
* only
*/
public static final String APP_SUBMIT_TIME_ENV = "APP_SUBMIT_TIME_ENV";
public static final String CONTAINER_TOKEN_FILE_ENV_NAME =
UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;

View File

@ -126,7 +126,7 @@ public interface ApplicationSubmissionContext {
@Public
@Stable
public void setUser(String user);
/**
* Get the <code>ContainerLaunchContext</code> to describe the
* <code>Container</code> with which the <code>ApplicationMaster</code> is

View File

@ -287,7 +287,7 @@ public class ApplicationMaster {
Map<String, String> envs = System.getenv();
appAttemptID = Records.newRecord(ApplicationAttemptId.class);
if (!envs.containsKey(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV)) {
if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
if (cliParser.hasOption("app_attempt_id")) {
String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
@ -296,7 +296,8 @@ public class ApplicationMaster {
throw new IllegalArgumentException("Application Attempt Id not set in the environment");
}
} else {
appAttemptID = ConverterUtils.toApplicationAttemptId(envs.get(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV));
ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV));
appAttemptID = containerId.getApplicationAttemptId();
}
LOG.info("Application master for app"

View File

@ -20,10 +20,8 @@ package org.apache.hadoop.yarn.util;
import static org.apache.hadoop.yarn.util.StringHelper._split;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@ -33,9 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -97,27 +93,8 @@ public class ConverterUtils {
return url;
}
// TODO: Why thread local?
// ^ NumberFormat instances are not threadsafe
private static final ThreadLocal<NumberFormat> appIdFormat =
new ThreadLocal<NumberFormat>() {
@Override
public NumberFormat initialValue() {
NumberFormat fmt = NumberFormat.getInstance();
fmt.setGroupingUsed(false);
fmt.setMinimumIntegerDigits(4);
return fmt;
}
};
public static String toString(ApplicationId appId) {
StringBuilder sb = new StringBuilder();
sb.append(APPLICATION_PREFIX + "_").append(appId.getClusterTimestamp())
.append("_");
sb.append(appIdFormat.get().format(appId.getId()));
return sb.toString();
return appId.toString();
}
public static ApplicationId toApplicationId(RecordFactory recordFactory,
@ -152,11 +129,11 @@ public class ConverterUtils {
return cId.toString();
}
public static ContainerId toContainerId(String containerIdStr)
throws IOException {
public static ContainerId toContainerId(String containerIdStr) {
Iterator<String> it = _split(containerIdStr).iterator();
if (!it.next().equals(CONTAINER_PREFIX)) {
throw new IOException("Invalid ContainerId prefix: " + containerIdStr);
throw new IllegalArgumentException("Invalid ContainerId prefix: "
+ containerIdStr);
}
try {
ApplicationAttemptId appAttemptID = toApplicationAttemptId(it);
@ -165,21 +142,22 @@ public class ConverterUtils {
containerId.setId(Integer.parseInt(it.next()));
return containerId;
} catch (NumberFormatException n) {
throw new IOException("Invalid ContainerId: " + containerIdStr, n);
throw new IllegalArgumentException("Invalid ContainerId: "
+ containerIdStr, n);
}
}
public static ApplicationAttemptId toApplicationAttemptId(
String applicationAttmeptIdStr) throws IOException {
String applicationAttmeptIdStr) {
Iterator<String> it = _split(applicationAttmeptIdStr).iterator();
if (!it.next().equals(APPLICATION_ATTEMPT_PREFIX)) {
throw new IOException("Invalid AppAttemptId prefix: "
throw new IllegalArgumentException("Invalid AppAttemptId prefix: "
+ applicationAttmeptIdStr);
}
try {
return toApplicationAttemptId(it);
} catch (NumberFormatException n) {
throw new IOException("Invalid AppAttemptId: "
throw new IllegalArgumentException("Invalid AppAttemptId: "
+ applicationAttmeptIdStr, n);
}
}

View File

@ -69,7 +69,7 @@ public class ContainerLogsPage extends NMView {
ContainerId containerId;
try {
containerId = ConverterUtils.toContainerId($(CONTAINER_ID));
} catch (IOException e) {
} catch (IllegalArgumentException e) {
div.h1("Invalid containerId " + $(CONTAINER_ID))._();
return;
}

View File

@ -22,8 +22,6 @@ import static org.apache.hadoop.yarn.util.StringHelper.ujoin;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
import java.io.IOException;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -66,7 +64,7 @@ public class ContainerPage extends NMView implements NMWebParams {
ContainerId containerID;
try {
containerID = ConverterUtils.toContainerId($(CONTAINER_ID));
} catch (IOException e) {
} catch (IllegalArgumentException e) {
html.p()._("Invalid containerId " + $(CONTAINER_ID))._();
return;
}

View File

@ -206,11 +206,12 @@ public class ClientRMService extends AbstractService implements
// Safety
submissionContext.setUser(user);
// This needs to be synchronous as the client can query
// immediately following the submission to get the application status.
// So call handle directly and do not send an event.
rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext));
rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
.currentTimeMillis()));
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user + " with " + submissionContext);

View File

@ -213,7 +213,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent> {
@SuppressWarnings("unchecked")
protected synchronized void submitApplication(
ApplicationSubmissionContext submissionContext) {
ApplicationSubmissionContext submissionContext, long submitTime) {
ApplicationId applicationId = submissionContext.getApplicationId();
RMApp application = null;
try {
@ -241,13 +241,13 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent> {
ApplicationStore appStore = rmContext.getApplicationsStore()
.createApplicationStore(submissionContext.getApplicationId(),
submissionContext);
// Create RMApp
application = new RMAppImpl(applicationId, rmContext,
this.conf, submissionContext.getApplicationName(), user,
submissionContext.getQueue(), submissionContext, clientTokenStr,
appStore, this.scheduler,
this.masterService);
this.masterService, submitTime);
if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
null) {
@ -284,8 +284,9 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent> {
case APP_SUBMIT:
{
ApplicationSubmissionContext submissionContext =
((RMAppManagerSubmitEvent)event).getSubmissionContext();
submitApplication(submissionContext);
((RMAppManagerSubmitEvent)event).getSubmissionContext();
long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime();
submitApplication(submissionContext, submitTime);
}
break;
default:

View File

@ -23,13 +23,21 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
public class RMAppManagerSubmitEvent extends RMAppManagerEvent {
private final ApplicationSubmissionContext submissionContext;
private final long submitTime;
public RMAppManagerSubmitEvent(ApplicationSubmissionContext submissionContext) {
super(submissionContext.getApplicationId(), RMAppManagerEventType.APP_SUBMIT);
public RMAppManagerSubmitEvent(
ApplicationSubmissionContext submissionContext, long submitTime) {
super(submissionContext.getApplicationId(),
RMAppManagerEventType.APP_SUBMIT);
this.submissionContext = submissionContext;
this.submitTime = submitTime;
}
public ApplicationSubmissionContext getSubmissionContext() {
return this.submissionContext;
}
public long getSubmitTime() {
return this.submitTime;
}
}

View File

@ -22,8 +22,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.crypto.SecretKey;
@ -37,7 +35,6 @@ import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
@ -58,7 +55,6 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -83,6 +79,7 @@ public class AMLauncher implements Runnable {
private final ApplicationTokenSecretManager applicationTokenSecretManager;
private final ClientToAMSecretManager clientToAMSecretManager;
private final AMLauncherEventType eventType;
private final RMContext rmContext;
@SuppressWarnings("rawtypes")
private final EventHandler handler;
@ -96,6 +93,7 @@ public class AMLauncher implements Runnable {
this.applicationTokenSecretManager = applicationTokenSecretManager;
this.clientToAMSecretManager = clientToAMSecretManager;
this.eventType = eventType;
this.rmContext = rmContext;
this.handler = rmContext.getDispatcher().getEventHandler();
}
@ -189,9 +187,18 @@ public class AMLauncher implements Runnable {
throws IOException {
Map<String, String> environment = container.getEnvironment();
// Set the AppAttemptId to be consumable by the AM.
environment.put(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV,
application.getAppAttemptId().toString());
// Set the AppAttemptId, containerId, NMHTTPAdress, AppSubmitTime to be
// consumable by the AM.
environment.put(ApplicationConstants.AM_CONTAINER_ID_ENV, container
.getContainerId().toString());
environment.put(ApplicationConstants.NM_HTTP_ADDRESS_ENV, application
.getMasterContainer().getNodeHttpAddress());
environment.put(
ApplicationConstants.APP_SUBMIT_TIME_ENV,
String.valueOf(rmContext.getRMApps()
.get(application.getAppAttemptId().getApplicationId())
.getSubmitTime()));
if (UserGroupInformation.isSecurityEnabled()) {
// TODO: Security enabled/disabled info should come from RM.

View File

@ -115,6 +115,12 @@ public interface RMApp extends EventHandler<RMAppEvent> {
*/
long getStartTime();
/**
* the submit time of the application.
* @return the submit time of the application.
*/
long getSubmitTime();
/**
* The tracking url for the application master.
* @return the tracking url for the application master.

View File

@ -84,6 +84,7 @@ public class RMAppImpl implements RMApp {
private final WriteLock writeLock;
private final Map<ApplicationAttemptId, RMAppAttempt> attempts
= new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
private final long submitTime;
// Mutable fields
private long startTime;
@ -163,7 +164,8 @@ public class RMAppImpl implements RMApp {
Configuration config, String name, String user, String queue,
ApplicationSubmissionContext submissionContext, String clientTokenStr,
ApplicationStore appStore,
YarnScheduler scheduler, ApplicationMasterService masterService) {
YarnScheduler scheduler, ApplicationMasterService masterService,
long submitTime) {
this.applicationId = applicationId;
this.name = name;
@ -178,6 +180,7 @@ public class RMAppImpl implements RMApp {
this.appStore = appStore;
this.scheduler = scheduler;
this.masterService = masterService;
this.submitTime = submitTime;
this.startTime = System.currentTimeMillis();
this.maxRetries = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
@ -367,6 +370,11 @@ public class RMAppImpl implements RMApp {
}
}
@Override
public long getSubmitTime() {
return this.submitTime;
}
@Override
public String getTrackingUrl() {
this.readLock.lock();

View File

@ -42,6 +42,7 @@ public class MockNM {
private final String nodeIdStr;
private final int memory;
private final ResourceTrackerService resourceTracker;
private final int httpPort = 2;
MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
this.nodeIdStr = nodeIdStr;
@ -53,6 +54,10 @@ public class MockNM {
return nodeId;
}
public String getHttpAddress() {
return nodeId.getHost() + ":" + String.valueOf(httpPort);
}
public void containerStatus(Container container) throws Exception {
Map<ApplicationId, List<ContainerStatus>> conts =
new HashMap<ApplicationId, List<ContainerStatus>>();
@ -69,7 +74,7 @@ public class MockNM {
RegisterNodeManagerRequest req = Records.newRecord(
RegisterNodeManagerRequest.class);
req.setNodeId(nodeId);
req.setHttpPort(2);
req.setHttpPort(httpPort);
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(memory);
req.setResource(resource);

View File

@ -163,7 +163,7 @@ public class TestAppManager{
}
public void submitApplication(
ApplicationSubmissionContext submissionContext) {
super.submitApplication(submissionContext);
super.submitApplication(submissionContext, System.currentTimeMillis());
}
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
@ -39,6 +40,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMaste
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@ -56,6 +58,9 @@ public class TestApplicationMasterLauncher {
boolean launched = false;
boolean cleanedup = false;
String attemptIdAtContainerManager = null;
String containerIdAtContainerManager = null;
String nmAddressAtContainerManager = null;
long submitTimeAtContainerManager;
@Override
public StartContainerResponse
@ -63,9 +68,20 @@ public class TestApplicationMasterLauncher {
throws YarnRemoteException {
LOG.info("Container started by MyContainerManager: " + request);
launched = true;
attemptIdAtContainerManager = request.getContainerLaunchContext()
.getEnvironment().get(
ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV);
containerIdAtContainerManager =
request.getContainerLaunchContext().getEnvironment()
.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
ContainerId containerId =
ConverterUtils.toContainerId(containerIdAtContainerManager);
attemptIdAtContainerManager =
containerId.getApplicationAttemptId().toString();
nmAddressAtContainerManager =
request.getContainerLaunchContext().getEnvironment()
.get(ApplicationConstants.NM_HTTP_ADDRESS_ENV);
submitTimeAtContainerManager =
Long.parseLong(request.getContainerLaunchContext().getEnvironment()
.get(ApplicationConstants.APP_SUBMIT_TIME_ENV));
return null;
}
@ -140,6 +156,13 @@ public class TestApplicationMasterLauncher {
ApplicationAttemptId appAttemptId = attempt.getAppAttemptId();
Assert.assertEquals(appAttemptId.toString(),
containerManager.attemptIdAtContainerManager);
Assert.assertEquals(app.getSubmitTime(),
containerManager.submitTimeAtContainerManager);
Assert.assertEquals(app.getRMAppAttempt(appAttemptId)
.getSubmissionContext().getAMContainerSpec().getContainerId()
.toString(), containerManager.containerIdAtContainerManager);
Assert.assertEquals(nm1.getHttpAddress(),
containerManager.nmAddressAtContainerManager);
MockAM am = new MockAM(rm.getRMContext(), rm
.getApplicationMasterService(), appAttemptId);

View File

@ -166,6 +166,11 @@ public abstract class MockAsm extends MockApps {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public long getSubmitTime() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public long getFinishTime() {
throw new UnsupportedOperationException("Not supported yet.");

View File

@ -33,6 +33,7 @@ public class MockRMApp implements RMApp {
String name = MockApps.newAppName();
String queue = MockApps.newQueue();
long start = System.currentTimeMillis() - (int) (Math.random() * DT);
long submit = start - (int) (Math.random() * DT);
long finish = 0;
RMAppState state = RMAppState.NEW;
int failCount = 0;
@ -141,6 +142,11 @@ public class MockRMApp implements RMApp {
return start;
}
@Override
public long getSubmitTime() {
return submit;
}
public void setStartTime(long time) {
this.start = time;
}

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.List;
import junit.framework.Assert;
@ -51,7 +50,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
@ -61,7 +59,7 @@ public class TestRMAppTransitions {
private RMContext rmContext;
private static int maxRetries = 4;
private static int appId = 1;
private AsyncDispatcher rmDispatcher;
// private AsyncDispatcher rmDispatcher;
// ignore all the RM application attempt events
private static final class TestApplicationAttemptEventDispatcher implements
@ -152,7 +150,7 @@ public class TestRMAppTransitions {
conf, name, user,
queue, submissionContext, clientTokenStr,
appStore, scheduler,
masterService);
masterService, System.currentTimeMillis());
testAppStartState(applicationId, user, name, queue, application);
return application;

View File

@ -323,25 +323,29 @@ Hadoop MapReduce Next Generation - Writing YARN Applications
multi-tenancy nature, amongst other issues, it cannot make any assumptions
of things like pre-configured ports that it can listen on.
* When the ApplicationMaster starts up, several parameters are made available
to it via the environment. These include the ContainerId for the
ApplicationMaster container, the application submission time and the HTTP
address of the NodeManager running the container. Ref ApplicationConstants
for parameter names.
* All interactions with the ResourceManager require an ApplicationAttemptId
(there can be multiple attempts per application in case of failures). When
the ApplicationMaster starts up, the ApplicationAttemptId associated with
this particular instance will be set in the environment. There are helper
apis to convert the value obtained from the environment into an
ApplicationAttemptId object.
(there can be multiple attempts per application in case of failures). The
ApplicationAttemptId can be obtained from the ApplicationMaster
containerId. There are helper apis to convert the value obtained from the
environment into objects.
+---+
Map<String, String> envs = System.getenv();
ApplicationAttemptId appAttemptID =
Records.newRecord(ApplicationAttemptId.class);
if (!envs.containsKey(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV)) {
// app attempt id should always be set in the env by the framework
String containerIdString =
envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
if (containerIdString == null) {
// container id should always be set in the env by the framework
throw new IllegalArgumentException(
"ApplicationAttemptId not set in the environment");
}
appAttemptID =
ConverterUtils.toApplicationAttemptId(
envs.get(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV));
"ContainerId not set in the environment");
}
ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
+---+
* After an ApplicationMaster has initialized itself completely, it needs to