MAPREDUCE-6688. Store job configurations in Timeline Service v2 (Varun Saxena via sjlee)

This commit is contained in:
Sangjin Lee 2016-05-03 09:19:36 -07:00
parent 089caf49fe
commit 000a4d8e13
6 changed files with 181 additions and 19 deletions

View File

@ -1074,7 +1074,16 @@ public class JobHistoryEventHandler extends AbstractService
entity.setId(jobId.toString());
return entity;
}
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createJobEntity(JobId jobId) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(jobId.toString());
entity.setType(MAPREDUCE_JOB_ENTITY_TYPE);
return entity;
}
// create ApplicationEntity with job finished Metrics from HistoryEvent
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createAppEntityWithJobMetrics(HistoryEvent event, JobId jobId) {
@ -1133,6 +1142,46 @@ public class JobHistoryEventHandler extends AbstractService
return entity;
}
private void publishConfigsOnJobSubmittedEvent(JobSubmittedEvent event,
JobId jobId) {
if (event.getJobConf() == null) {
return;
}
// Publish job configurations both as job and app entity.
// Configs are split into multiple entities if they exceed 100kb in size.
org.apache.hadoop.yarn.api.records.timelineservice.
TimelineEntity jobEntityForConfigs = createJobEntity(jobId);
ApplicationEntity appEntityForConfigs = new ApplicationEntity();
String appId = jobId.getAppId().toString();
appEntityForConfigs.setId(appId);
try {
int configSize = 0;
for (Map.Entry<String, String> entry : event.getJobConf()) {
int size = entry.getKey().length() + entry.getValue().length();
configSize += size;
if (configSize > JobHistoryEventUtils.ATS_CONFIG_PUBLISH_SIZE_BYTES) {
if (jobEntityForConfigs.getConfigs().size() > 0) {
timelineClient.putEntities(jobEntityForConfigs);
timelineClient.putEntities(appEntityForConfigs);
jobEntityForConfigs = createJobEntity(jobId);
appEntityForConfigs = new ApplicationEntity();
appEntityForConfigs.setId(appId);
}
configSize = size;
}
jobEntityForConfigs.addConfig(entry.getKey(), entry.getValue());
appEntityForConfigs.addConfig(entry.getKey(), entry.getValue());
}
if (configSize > 0) {
timelineClient.putEntities(jobEntityForConfigs);
timelineClient.putEntities(appEntityForConfigs);
}
} catch (IOException | YarnException e) {
LOG.error("Exception while publishing configs on JOB_SUBMITTED Event " +
" for the job : " + jobId, e);
}
}
private void processEventForNewTimelineService(HistoryEvent event,
JobId jobId, long timestamp) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity tEntity =
@ -1252,8 +1301,12 @@ public class JobHistoryEventHandler extends AbstractService
} catch (IOException | YarnException e) {
LOG.error("Failed to process Event " + event.getEventType()
+ " for the job : " + jobId, e);
return;
}
if (event.getEventType() == EventType.JOB_SUBMITTED) {
// Publish configs after main job submitted event has been posted.
publishConfigsOnJobSubmittedEvent((JobSubmittedEvent)event, jobId);
}
}
private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {

View File

@ -1445,7 +1445,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
getWorkflowAdjacencies(job.conf),
job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""), job.conf);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
//TODO JH Verify jobACLs, UserName via UGI?

View File

@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.security.authorize.AccessControlList;
@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
@InterfaceStability.Unstable
public class JobSubmittedEvent implements HistoryEvent {
private JobSubmitted datum = new JobSubmitted();
private JobConf jobConf = null;
/**
* Create an event to record job submission
@ -83,6 +85,31 @@ public class JobSubmittedEvent implements HistoryEvent {
workflowAdjacencies, "");
}
/**
* Create an event to record job submission.
* @param id The job Id of the job
* @param jobName Name of the job
* @param userName Name of the user who submitted the job
* @param submitTime Time of submission
* @param jobConfPath Path of the Job Configuration file
* @param jobACLs The configured acls for the job.
* @param jobQueueName The job-queue to which this job was submitted to
* @param workflowId The Id of the workflow
* @param workflowName The name of the workflow
* @param workflowNodeName The node name of the workflow
* @param workflowAdjacencies The adjacencies of the workflow
* @param workflowTags Comma-separated tags for the workflow
*/
public JobSubmittedEvent(JobID id, String jobName, String userName,
long submitTime, String jobConfPath,
Map<JobACL, AccessControlList> jobACLs, String jobQueueName,
String workflowId, String workflowName, String workflowNodeName,
String workflowAdjacencies, String workflowTags) {
this(id, jobName, userName, submitTime, jobConfPath, jobACLs,
jobQueueName, workflowId, workflowName, workflowNodeName,
workflowAdjacencies, workflowTags, null);
}
/**
* Create an event to record job submission
* @param id The job Id of the job
@ -97,12 +124,13 @@ public class JobSubmittedEvent implements HistoryEvent {
* @param workflowNodeName The node name of the workflow
* @param workflowAdjacencies The adjacencies of the workflow
* @param workflowTags Comma-separated tags for the workflow
* @param conf Job configuration
*/
public JobSubmittedEvent(JobID id, String jobName, String userName,
long submitTime, String jobConfPath,
Map<JobACL, AccessControlList> jobACLs, String jobQueueName,
String workflowId, String workflowName, String workflowNodeName,
String workflowAdjacencies, String workflowTags) {
String workflowAdjacencies, String workflowTags, JobConf conf) {
datum.setJobid(new Utf8(id.toString()));
datum.setJobName(new Utf8(jobName));
datum.setUserName(new Utf8(userName));
@ -132,6 +160,7 @@ public class JobSubmittedEvent implements HistoryEvent {
if (workflowTags != null) {
datum.setWorkflowTags(new Utf8(workflowTags));
}
jobConf = conf;
}
JobSubmittedEvent() {}
@ -208,7 +237,11 @@ public class JobSubmittedEvent implements HistoryEvent {
}
/** Get the event type */
public EventType getEventType() { return EventType.JOB_SUBMITTED; }
public JobConf getJobConf() {
return jobConf;
}
@Override
public TimelineEvent toTimelineEvent() {
TimelineEvent tEvent = new TimelineEvent();
@ -234,5 +267,4 @@ public class JobSubmittedEvent implements HistoryEvent {
public Set<TimelineMetric> getTimelineMetrics() {
return null;
}
}

View File

@ -36,6 +36,9 @@ public final class JobHistoryEventUtils {
private JobHistoryEventUtils() {
}
// Number of bytes of config which can be published in one shot to ATSv2.
public static final int ATS_CONFIG_PUBLISH_SIZE_BYTES = 10 * 1024;
public static JsonNode countersToJSON(Counters counters) {
ObjectMapper mapper = new ObjectMapper();
ArrayNode nodes = mapper.createArrayNode();

View File

@ -26,7 +26,9 @@ import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
@ -55,6 +57,8 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Assert;
import org.junit.Test;
import com.google.common.collect.Sets;
public class TestMRTimelineEventHandling {
private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
@ -191,8 +195,17 @@ public class TestMRTimelineEventHandling {
Path inDir = new Path("input");
Path outDir = new Path("output");
LOG.info("Run 1st job which should be successful.");
JobConf successConf = new JobConf(conf);
successConf.set("dummy_conf1",
UtilsForTests.createConfigValue(51 * 1024));
successConf.set("dummy_conf2",
UtilsForTests.createConfigValue(51 * 1024));
successConf.set("huge_dummy_conf1",
UtilsForTests.createConfigValue(101 * 1024));
successConf.set("huge_dummy_conf2",
UtilsForTests.createConfigValue(101 * 1024));
RunningJob job =
UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
UtilsForTests.runJobSucceed(successConf, inDir, outDir);
Assert.assertEquals(JobStatus.SUCCEEDED,
job.getJobStatus().getState().getValue());
@ -270,7 +283,11 @@ public class TestMRTimelineEventHandling {
Assert.assertTrue("jobEventFilePath: " + jobEventFilePath +
" does not exist.",
jobEventFile.exists());
verifyMetricsWhenEvent(jobEventFile, EventType.JOB_FINISHED.name());
verifyEntity(jobEventFile, EventType.JOB_FINISHED.name(),
true, false, null);
Set<String> cfgsToCheck = Sets.newHashSet("dummy_conf1", "dummy_conf2",
"huge_dummy_conf1", "huge_dummy_conf2");
verifyEntity(jobEventFile, null, false, true, cfgsToCheck);
// for this test, we expect MR job metrics are published in YARN_APPLICATION
String outputAppDir = basePath + "/YARN_APPLICATION/";
@ -290,7 +307,8 @@ public class TestMRTimelineEventHandling {
"appEventFilePath: " + appEventFilePath +
" does not exist.",
appEventFile.exists());
verifyMetricsWhenEvent(appEventFile, null);
verifyEntity(appEventFile, null, true, false, null);
verifyEntity(appEventFile, null, false, true, cfgsToCheck);
// check for task event file
String outputDirTask = basePath + "/MAPREDUCE_TASK/";
@ -307,7 +325,8 @@ public class TestMRTimelineEventHandling {
Assert.assertTrue("taskEventFileName: " + taskEventFilePath +
" does not exist.",
taskEventFile.exists());
verifyMetricsWhenEvent(taskEventFile, EventType.TASK_FINISHED.name());
verifyEntity(taskEventFile, EventType.TASK_FINISHED.name(),
true, false, null);
// check for task attempt event file
String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/";
@ -324,17 +343,30 @@ public class TestMRTimelineEventHandling {
File taskAttemptEventFile = new File(taskAttemptEventFilePath);
Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath +
" does not exist.", taskAttemptEventFile.exists());
verifyMetricsWhenEvent(taskAttemptEventFile,
EventType.MAP_ATTEMPT_FINISHED.name());
verifyEntity(taskAttemptEventFile, EventType.MAP_ATTEMPT_FINISHED.name(),
true, false, null);
}
private void verifyMetricsWhenEvent(File entityFile, String eventId)
/**
* Verifies entity by reading the entity file written via FS impl.
* @param entityFile File to be read.
* @param eventId Event to be checked.
* @param chkMetrics If event is not null, this flag determines if metrics
* exist when the event is encountered. If event is null, we merely check
* if metrics exist in the entity file.
* @param chkCfg If event is not null, this flag determines if configs
* exist when the event is encountered. If event is null, we merely check
* if configs exist in the entity file.
* @param cfgsToVerify a set of configs which should exist in the entity file.
* @throws IOException
*/
private void verifyEntity(File entityFile, String eventId,
boolean chkMetrics, boolean chkCfg, Set<String> cfgsToVerify)
throws IOException {
BufferedReader reader = null;
String strLine;
try {
reader = new BufferedReader(new FileReader(entityFile));
boolean jobMetricsFoundForAppEntity = false;
while ((strLine = reader.readLine()) != null) {
if (strLine.trim().length() > 0) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
@ -344,23 +376,57 @@ public class TestMRTimelineEventHandling {
if (eventId == null) {
// Job metrics are published without any events for
// ApplicationEntity. There is also possibility that some other
// ApplicationEntity is published without events, hence loop all
if (entity.getEvents().size() == 0) {
jobMetricsFoundForAppEntity = entity.getMetrics().size() > 0;
if (jobMetricsFoundForAppEntity) {
// ApplicationEntity is published without events, hence loop till
// its found. Same applies to configs.
if (chkMetrics && entity.getMetrics().size() > 0) {
return;
}
if (chkCfg && entity.getConfigs().size() > 0) {
if (cfgsToVerify == null) {
return;
} else {
// Have configs to verify. Keep on removing configs from the set
// of configs to verify as they are found. When the all the
// entities have been looped through, we will check if the set
// is empty or not(indicating if all configs have been found or
// not).
for (Iterator<String> itr =
cfgsToVerify.iterator(); itr.hasNext();) {
String config = itr.next();
if (entity.getConfigs().containsKey(config)) {
itr.remove();
}
}
// All the required configs have been verified, so return.
if (cfgsToVerify.isEmpty()) {
return;
}
}
}
} else {
for (TimelineEvent event : entity.getEvents()) {
if (event.getId().equals(eventId)) {
assertTrue(entity.getMetrics().size() > 0);
if (chkMetrics) {
assertTrue(entity.getMetrics().size() > 0);
}
if (chkCfg) {
assertTrue(entity.getConfigs().size() > 0);
if (cfgsToVerify != null) {
for (String cfg : cfgsToVerify) {
assertTrue(entity.getConfigs().containsKey(cfg));
}
}
}
return;
}
}
}
}
}
if (cfgsToVerify != null) {
assertTrue(cfgsToVerify.isEmpty());
return;
}
fail("Expected event : " + eventId + " not found in the file "
+ entityFile);
} finally {

View File

@ -156,6 +156,14 @@ public class UtilsForTests {
return buf.toString();
}
public static String createConfigValue(int msgSize) {
StringBuilder sb = new StringBuilder(msgSize);
for (int i=0; i<msgSize; i++) {
sb.append('a');
}
return sb.toString();
}
public static String safeGetCanonicalPath(File f) {
try {
String s = f.getCanonicalPath();