Merge -c 1465188 from trunk to branch-2 to fix MAPREDUCE-5129. Allow tags to JobHistory for deeper analytics. Contributed by Billie Rinaldi.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1465189 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
85c29d073c
commit
b66abebca2
|
@ -118,6 +118,9 @@ Release 2.0.4-alpha - UNRELEASED
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
|
MAPREDUCE-5129. Allow tags to JobHistory for deeper analytics. (billie via
|
||||||
|
acmurthy)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -1255,7 +1255,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
|
job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
|
||||||
job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
|
job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
|
||||||
job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
|
job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
|
||||||
getWorkflowAdjacencies(job.conf));
|
getWorkflowAdjacencies(job.conf),
|
||||||
|
job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
|
||||||
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
|
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
|
||||||
//TODO JH Verify jobACLs, UserName via UGI?
|
//TODO JH Verify jobACLs, UserName via UGI?
|
||||||
|
|
||||||
|
|
|
@ -114,6 +114,7 @@ public class TestJobImpl {
|
||||||
conf.set(MRJobConfig.WORKFLOW_NODE_NAME, "testNodeName");
|
conf.set(MRJobConfig.WORKFLOW_NODE_NAME, "testNodeName");
|
||||||
conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key1", "value1");
|
conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key1", "value1");
|
||||||
conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key2", "value2");
|
conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key2", "value2");
|
||||||
|
conf.set(MRJobConfig.WORKFLOW_TAGS, "tag1,tag2");
|
||||||
|
|
||||||
|
|
||||||
AsyncDispatcher dispatcher = new AsyncDispatcher();
|
AsyncDispatcher dispatcher = new AsyncDispatcher();
|
||||||
|
@ -126,7 +127,8 @@ public class TestJobImpl {
|
||||||
commitHandler.start();
|
commitHandler.start();
|
||||||
|
|
||||||
JobSubmittedEventHandler jseHandler = new JobSubmittedEventHandler("testId",
|
JobSubmittedEventHandler jseHandler = new JobSubmittedEventHandler("testId",
|
||||||
"testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ");
|
"testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ",
|
||||||
|
"tag1,tag2");
|
||||||
dispatcher.register(EventType.class, jseHandler);
|
dispatcher.register(EventType.class, jseHandler);
|
||||||
JobImpl job = createStubbedJob(conf, dispatcher, 0);
|
JobImpl job = createStubbedJob(conf, dispatcher, 0);
|
||||||
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
|
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
|
||||||
|
@ -644,14 +646,18 @@ public class TestJobImpl {
|
||||||
|
|
||||||
private String workflowAdjacencies;
|
private String workflowAdjacencies;
|
||||||
|
|
||||||
|
private String workflowTags;
|
||||||
|
|
||||||
private Boolean assertBoolean;
|
private Boolean assertBoolean;
|
||||||
|
|
||||||
public JobSubmittedEventHandler(String workflowId, String workflowName,
|
public JobSubmittedEventHandler(String workflowId, String workflowName,
|
||||||
String workflowNodeName, String workflowAdjacencies) {
|
String workflowNodeName, String workflowAdjacencies,
|
||||||
|
String workflowTags) {
|
||||||
this.workflowId = workflowId;
|
this.workflowId = workflowId;
|
||||||
this.workflowName = workflowName;
|
this.workflowName = workflowName;
|
||||||
this.workflowNodeName = workflowNodeName;
|
this.workflowNodeName = workflowNodeName;
|
||||||
this.workflowAdjacencies = workflowAdjacencies;
|
this.workflowAdjacencies = workflowAdjacencies;
|
||||||
|
this.workflowTags = workflowTags;
|
||||||
assertBoolean = null;
|
assertBoolean = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -677,6 +683,10 @@ public class TestJobImpl {
|
||||||
setAssertValue(false);
|
setAssertValue(false);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (!workflowTags.equals(jsEvent.getWorkflowTags())) {
|
||||||
|
setAssertValue(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
setAssertValue(true);
|
setAssertValue(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -95,7 +95,8 @@
|
||||||
{"name": "workflowId", "type": "string"},
|
{"name": "workflowId", "type": "string"},
|
||||||
{"name": "workflowName", "type": "string"},
|
{"name": "workflowName", "type": "string"},
|
||||||
{"name": "workflowNodeName", "type": "string"},
|
{"name": "workflowNodeName", "type": "string"},
|
||||||
{"name": "workflowAdjacencies", "type": "string"}
|
{"name": "workflowAdjacencies", "type": "string"},
|
||||||
|
{"name": "workflowTags", "type": "string"}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|
|
@ -664,6 +664,8 @@ public interface MRJobConfig {
|
||||||
public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
|
public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
|
||||||
"^mapreduce\\.workflow\\.adjacency\\..+";
|
"^mapreduce\\.workflow\\.adjacency\\..+";
|
||||||
|
|
||||||
|
public static final String WORKFLOW_TAGS = "mapreduce.workflow.tags";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The maximum number of application attempts.
|
* The maximum number of application attempts.
|
||||||
* It is a application-specific setting.
|
* It is a application-specific setting.
|
||||||
|
|
|
@ -75,6 +75,31 @@ public class JobSubmittedEvent implements HistoryEvent {
|
||||||
Map<JobACL, AccessControlList> jobACLs, String jobQueueName,
|
Map<JobACL, AccessControlList> jobACLs, String jobQueueName,
|
||||||
String workflowId, String workflowName, String workflowNodeName,
|
String workflowId, String workflowName, String workflowNodeName,
|
||||||
String workflowAdjacencies) {
|
String workflowAdjacencies) {
|
||||||
|
this(id, jobName, userName, submitTime, jobConfPath, jobACLs,
|
||||||
|
jobQueueName, workflowId, workflowName, workflowNodeName,
|
||||||
|
workflowAdjacencies, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an event to record job submission
|
||||||
|
* @param id The job Id of the job
|
||||||
|
* @param jobName Name of the job
|
||||||
|
* @param userName Name of the user who submitted the job
|
||||||
|
* @param submitTime Time of submission
|
||||||
|
* @param jobConfPath Path of the Job Configuration file
|
||||||
|
* @param jobACLs The configured acls for the job.
|
||||||
|
* @param jobQueueName The job-queue to which this job was submitted to
|
||||||
|
* @param workflowId The Id of the workflow
|
||||||
|
* @param workflowName The name of the workflow
|
||||||
|
* @param workflowNodeName The node name of the workflow
|
||||||
|
* @param workflowAdjacencies The adjacencies of the workflow
|
||||||
|
* @param workflowTags Comma-separated tags for the workflow
|
||||||
|
*/
|
||||||
|
public JobSubmittedEvent(JobID id, String jobName, String userName,
|
||||||
|
long submitTime, String jobConfPath,
|
||||||
|
Map<JobACL, AccessControlList> jobACLs, String jobQueueName,
|
||||||
|
String workflowId, String workflowName, String workflowNodeName,
|
||||||
|
String workflowAdjacencies, String workflowTags) {
|
||||||
datum.jobid = new Utf8(id.toString());
|
datum.jobid = new Utf8(id.toString());
|
||||||
datum.jobName = new Utf8(jobName);
|
datum.jobName = new Utf8(jobName);
|
||||||
datum.userName = new Utf8(userName);
|
datum.userName = new Utf8(userName);
|
||||||
|
@ -101,6 +126,9 @@ public class JobSubmittedEvent implements HistoryEvent {
|
||||||
if (workflowAdjacencies != null) {
|
if (workflowAdjacencies != null) {
|
||||||
datum.workflowAdjacencies = new Utf8(workflowAdjacencies);
|
datum.workflowAdjacencies = new Utf8(workflowAdjacencies);
|
||||||
}
|
}
|
||||||
|
if (workflowTags != null) {
|
||||||
|
datum.workflowTags = new Utf8(workflowTags);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
JobSubmittedEvent() {}
|
JobSubmittedEvent() {}
|
||||||
|
@ -168,6 +196,13 @@ public class JobSubmittedEvent implements HistoryEvent {
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
/** Get the workflow tags */
|
||||||
|
public String getWorkflowTags() {
|
||||||
|
if (datum.workflowTags != null) {
|
||||||
|
return datum.workflowTags.toString();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
/** Get the event type */
|
/** Get the event type */
|
||||||
public EventType getEventType() { return EventType.JOB_SUBMITTED; }
|
public EventType getEventType() { return EventType.JOB_SUBMITTED; }
|
||||||
|
|
||||||
|
|
|
@ -92,6 +92,10 @@ public class Job20LineHistoryEventEmitter extends HistoryEventEmitter {
|
||||||
if (workflowAdjacencies == null) {
|
if (workflowAdjacencies == null) {
|
||||||
workflowAdjacencies = "";
|
workflowAdjacencies = "";
|
||||||
}
|
}
|
||||||
|
String workflowTags = line.get("WORKFLOW_TAGS");
|
||||||
|
if (workflowTags == null) {
|
||||||
|
workflowTags = "";
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
if (submitTime != null) {
|
if (submitTime != null) {
|
||||||
|
@ -104,7 +108,8 @@ public class Job20LineHistoryEventEmitter extends HistoryEventEmitter {
|
||||||
new HashMap<JobACL, AccessControlList>();
|
new HashMap<JobACL, AccessControlList>();
|
||||||
return new JobSubmittedEvent(jobID, jobName, user,
|
return new JobSubmittedEvent(jobID, jobName, user,
|
||||||
that.originalSubmitTime, jobConf, jobACLs, jobQueueName,
|
that.originalSubmitTime, jobConf, jobACLs, jobQueueName,
|
||||||
workflowId, workflowName, workflowNodeName, workflowAdjacencies);
|
workflowId, workflowName, workflowNodeName, workflowAdjacencies,
|
||||||
|
workflowTags);
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
|
|
Loading…
Reference in New Issue