MAPREDUCE-3342. Fixed JobHistoryServer to also show the job's queue name. Contributed by Jonathan Eagles.
svn merge -c r1199133 --ignore-ancestry ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1199135 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fcf44394a0
commit
fe6c20b02a
|
@ -36,6 +36,9 @@ Release 0.23.1 - Unreleased
|
|||
MAPREDUCE-3344. o.a.h.mapreduce.Reducer since 0.21 blindly casts to
|
||||
ReduceContext.ValueIterator. (Brock Noland via tomwhite)
|
||||
|
||||
MAPREDUCE-3342. Fixed JobHistoryServer to also show the job's queue
|
||||
name. (Jonathan Eagles via vinodkv)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -400,6 +400,7 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
JobSubmittedEvent jobSubmittedEvent =
|
||||
(JobSubmittedEvent) event.getHistoryEvent();
|
||||
mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime());
|
||||
mi.getJobIndexInfo().setQueueName(jobSubmittedEvent.getJobQueueName());
|
||||
}
|
||||
|
||||
// If this is JobFinishedEvent, close the writer and setup the job-index
|
||||
|
|
|
@ -55,6 +55,7 @@ public interface Job {
|
|||
int getCompletedReduces();
|
||||
boolean isUber();
|
||||
String getUserName();
|
||||
String getQueueName();
|
||||
|
||||
/**
|
||||
* @return a path to where the config file for this job is located.
|
||||
|
|
|
@ -142,6 +142,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
private final EventHandler eventHandler;
|
||||
private final MRAppMetrics metrics;
|
||||
private final String userName;
|
||||
private final String queueName;
|
||||
private final long appSubmitTime;
|
||||
|
||||
private boolean lazyTasksCopyNeeded = false;
|
||||
|
@ -375,6 +376,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
|
||||
this.amInfos = amInfos;
|
||||
this.userName = userName;
|
||||
this.queueName = conf.get(MRJobConfig.QUEUE_NAME, "default");
|
||||
this.appSubmitTime = appSubmitTime;
|
||||
this.oldJobId = TypeConverter.fromYarn(jobId);
|
||||
this.newApiCommitter = newApiCommitter;
|
||||
|
@ -766,6 +768,11 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
return userName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getQueueName() {
|
||||
return queueName;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.apache.hadoop.mapreduce.v2.app.job.Job#getConfFile()
|
||||
|
@ -829,7 +836,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
job.conf.get(MRJobConfig.USER_NAME, "mapred"),
|
||||
job.appSubmitTime,
|
||||
job.remoteJobConfFile.toString(),
|
||||
job.jobACLs, job.conf.get(MRJobConfig.QUEUE_NAME, "default"));
|
||||
job.jobACLs, job.queueName);
|
||||
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
|
||||
//TODO JH Verify jobACLs, UserName via UGI?
|
||||
|
||||
|
|
|
@ -492,6 +492,11 @@ public class MockJobs extends MockApps {
|
|||
return "mock";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getQueueName() {
|
||||
return "mockqueue";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getConfFile() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
|
|
|
@ -439,6 +439,11 @@ public class TestRuntimeEstimators {
|
|||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getQueueName() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTotalMaps() {
|
||||
return mapTasks.size();
|
||||
|
|
|
@ -23,6 +23,9 @@ import java.io.UnsupportedEncodingException;
|
|||
import java.net.URLDecoder;
|
||||
import java.net.URLEncoder;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.mapreduce.JobID;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
|
@ -35,6 +38,10 @@ public class FileNameIndexUtils {
|
|||
static final String DELIMITER = "-";
|
||||
static final String DELIMITER_ESCAPE = "%2D";
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(FileNameIndexUtils.class);
|
||||
|
||||
// Job history file names need to be backwards compatible
|
||||
// Only append new elements to the end of this list
|
||||
private static final int JOB_ID_INDEX = 0;
|
||||
private static final int SUBMIT_TIME_INDEX = 1;
|
||||
private static final int USER_INDEX = 2;
|
||||
|
@ -43,7 +50,7 @@ public class FileNameIndexUtils {
|
|||
private static final int NUM_MAPS_INDEX = 5;
|
||||
private static final int NUM_REDUCES_INDEX = 6;
|
||||
private static final int JOB_STATUS_INDEX = 7;
|
||||
private static final int MAX_INDEX = JOB_STATUS_INDEX;
|
||||
private static final int QUEUE_NAME_INDEX = 8;
|
||||
|
||||
/**
|
||||
* Constructs the job history file name from the JobIndexInfo.
|
||||
|
@ -83,7 +90,11 @@ public class FileNameIndexUtils {
|
|||
|
||||
//JobStatus
|
||||
sb.append(indexInfo.getJobStatus());
|
||||
sb.append(DELIMITER);
|
||||
|
||||
//QueueName
|
||||
sb.append(indexInfo.getQueueName());
|
||||
|
||||
sb.append(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION);
|
||||
return encodeJobHistoryFileName(sb.toString());
|
||||
}
|
||||
|
@ -100,27 +111,60 @@ public class FileNameIndexUtils {
|
|||
JobIndexInfo indexInfo = new JobIndexInfo();
|
||||
|
||||
String[] jobDetails = fileName.split(DELIMITER);
|
||||
if (jobDetails.length != MAX_INDEX +1) {
|
||||
throw new IOException("Failed to parse file: [" + jhFileName + "]. Expected " + (MAX_INDEX + 1) + "parts.");
|
||||
}
|
||||
|
||||
JobID oldJobId = JobID.forName(decodeJobHistoryFileName(jobDetails[JOB_ID_INDEX]));
|
||||
JobId jobId = TypeConverter.toYarn(oldJobId);
|
||||
indexInfo.setJobId(jobId);
|
||||
//TODO Catch NumberFormatException - Do not fail if there's only a few fields missing.
|
||||
indexInfo.setSubmitTime(Long.parseLong(decodeJobHistoryFileName(jobDetails[SUBMIT_TIME_INDEX])));
|
||||
|
||||
indexInfo.setUser(decodeJobHistoryFileName(jobDetails[USER_INDEX]));
|
||||
|
||||
indexInfo.setJobName(decodeJobHistoryFileName(jobDetails[JOB_NAME_INDEX]));
|
||||
|
||||
indexInfo.setFinishTime(Long.parseLong(decodeJobHistoryFileName(jobDetails[FINISH_TIME_INDEX])));
|
||||
|
||||
indexInfo.setNumMaps(Integer.parseInt(decodeJobHistoryFileName(jobDetails[NUM_MAPS_INDEX])));
|
||||
|
||||
indexInfo.setNumReduces(Integer.parseInt(decodeJobHistoryFileName(jobDetails[NUM_REDUCES_INDEX])));
|
||||
|
||||
indexInfo.setJobStatus(decodeJobHistoryFileName(jobDetails[JOB_STATUS_INDEX]));
|
||||
|
||||
// Do not fail if there are some minor parse errors
|
||||
try {
|
||||
try {
|
||||
indexInfo.setSubmitTime(
|
||||
Long.parseLong(decodeJobHistoryFileName(jobDetails[SUBMIT_TIME_INDEX])));
|
||||
} catch (NumberFormatException e) {
|
||||
LOG.warn("Unable to parse submit time from job history file "
|
||||
+ jhFileName + " : " + e);
|
||||
}
|
||||
|
||||
indexInfo.setUser(
|
||||
decodeJobHistoryFileName(jobDetails[USER_INDEX]));
|
||||
|
||||
indexInfo.setJobName(
|
||||
decodeJobHistoryFileName(jobDetails[JOB_NAME_INDEX]));
|
||||
|
||||
try {
|
||||
indexInfo.setFinishTime(
|
||||
Long.parseLong(decodeJobHistoryFileName(jobDetails[FINISH_TIME_INDEX])));
|
||||
} catch (NumberFormatException e) {
|
||||
LOG.warn("Unable to parse finish time from job history file "
|
||||
+ jhFileName + " : " + e);
|
||||
}
|
||||
|
||||
try {
|
||||
indexInfo.setNumMaps(
|
||||
Integer.parseInt(decodeJobHistoryFileName(jobDetails[NUM_MAPS_INDEX])));
|
||||
} catch (NumberFormatException e) {
|
||||
LOG.warn("Unable to parse num maps from job history file "
|
||||
+ jhFileName + " : " + e);
|
||||
}
|
||||
|
||||
try {
|
||||
indexInfo.setNumReduces(
|
||||
Integer.parseInt(decodeJobHistoryFileName(jobDetails[NUM_REDUCES_INDEX])));
|
||||
} catch (NumberFormatException e) {
|
||||
LOG.warn("Unable to parse num reduces from job history file "
|
||||
+ jhFileName + " : " + e);
|
||||
}
|
||||
|
||||
indexInfo.setJobStatus(
|
||||
decodeJobHistoryFileName(jobDetails[JOB_STATUS_INDEX]));
|
||||
|
||||
indexInfo.setQueueName(
|
||||
decodeJobHistoryFileName(jobDetails[QUEUE_NAME_INDEX]));
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
LOG.warn("Parsing job history file with partial data encoded into name: "
|
||||
+ jhFileName);
|
||||
}
|
||||
|
||||
return indexInfo;
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ public class JobIndexInfo {
|
|||
private long submitTime;
|
||||
private long finishTime;
|
||||
private String user;
|
||||
private String queueName;
|
||||
private String jobName;
|
||||
private JobId jobId;
|
||||
private int numMaps;
|
||||
|
@ -67,6 +68,12 @@ public class JobIndexInfo {
|
|||
public void setUser(String user) {
|
||||
this.user = user;
|
||||
}
|
||||
public String getQueueName() {
|
||||
return queueName;
|
||||
}
|
||||
public void setQueueName(String queueName) {
|
||||
this.queueName = queueName;
|
||||
}
|
||||
public String getJobName() {
|
||||
return jobName;
|
||||
}
|
||||
|
|
|
@ -29,6 +29,16 @@ import org.junit.Test;
|
|||
|
||||
public class TestFileNameIndexUtils {
|
||||
|
||||
private static final String OLD_JOB_HISTORY_FILE_FORMATTER = "%s"
|
||||
+ FileNameIndexUtils.DELIMITER + "%s"
|
||||
+ FileNameIndexUtils.DELIMITER + "%s"
|
||||
+ FileNameIndexUtils.DELIMITER + "%s"
|
||||
+ FileNameIndexUtils.DELIMITER + "%s"
|
||||
+ FileNameIndexUtils.DELIMITER + "%s"
|
||||
+ FileNameIndexUtils.DELIMITER + "%s"
|
||||
+ FileNameIndexUtils.DELIMITER + "%s"
|
||||
+ JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
|
||||
|
||||
private static final String JOB_HISTORY_FILE_FORMATTER = "%s"
|
||||
+ FileNameIndexUtils.DELIMITER + "%s"
|
||||
+ FileNameIndexUtils.DELIMITER + "%s"
|
||||
|
@ -37,6 +47,7 @@ public class TestFileNameIndexUtils {
|
|||
+ FileNameIndexUtils.DELIMITER + "%s"
|
||||
+ FileNameIndexUtils.DELIMITER + "%s"
|
||||
+ FileNameIndexUtils.DELIMITER + "%s"
|
||||
+ FileNameIndexUtils.DELIMITER + "%s"
|
||||
+ JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
|
||||
|
||||
private static final String JOB_ID = "job_1317928501754_0001";
|
||||
|
@ -55,9 +66,48 @@ public class TestFileNameIndexUtils {
|
|||
private static final String NUM_MAPS = "1";
|
||||
private static final String NUM_REDUCES = "1";
|
||||
private static final String JOB_STATUS = "SUCCEEDED";
|
||||
private static final String QUEUE_NAME = "default";
|
||||
|
||||
@Test
|
||||
public void testUserNamePercentEncoding() throws IOException{
|
||||
public void testEncodingDecodingEquivalence() throws IOException {
|
||||
JobIndexInfo info = new JobIndexInfo();
|
||||
JobID oldJobId = JobID.forName(JOB_ID);
|
||||
JobId jobId = TypeConverter.toYarn(oldJobId);
|
||||
info.setJobId(jobId);
|
||||
info.setSubmitTime(Long.parseLong(SUBMIT_TIME));
|
||||
info.setUser(USER_NAME);
|
||||
info.setJobName(JOB_NAME);
|
||||
info.setFinishTime(Long.parseLong(FINISH_TIME));
|
||||
info.setNumMaps(Integer.parseInt(NUM_MAPS));
|
||||
info.setNumReduces(Integer.parseInt(NUM_REDUCES));
|
||||
info.setJobStatus(JOB_STATUS);
|
||||
info.setQueueName(QUEUE_NAME);
|
||||
|
||||
String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info);
|
||||
JobIndexInfo parsedInfo = FileNameIndexUtils.getIndexInfo(jobHistoryFile);
|
||||
|
||||
Assert.assertEquals("Job id different after encoding and decoding",
|
||||
info.getJobId(), parsedInfo.getJobId());
|
||||
Assert.assertEquals("Submit time different after encoding and decoding",
|
||||
info.getSubmitTime(), parsedInfo.getSubmitTime());
|
||||
Assert.assertEquals("User different after encoding and decoding",
|
||||
info.getUser(), parsedInfo.getUser());
|
||||
Assert.assertEquals("Job name different after encoding and decoding",
|
||||
info.getJobName(), parsedInfo.getJobName());
|
||||
Assert.assertEquals("Finish time different after encoding and decoding",
|
||||
info.getFinishTime(), parsedInfo.getFinishTime());
|
||||
Assert.assertEquals("Num maps different after encoding and decoding",
|
||||
info.getNumMaps(), parsedInfo.getNumMaps());
|
||||
Assert.assertEquals("Num reduces different after encoding and decoding",
|
||||
info.getNumReduces(), parsedInfo.getNumReduces());
|
||||
Assert.assertEquals("Job status different after encoding and decoding",
|
||||
info.getJobStatus(), parsedInfo.getJobStatus());
|
||||
Assert.assertEquals("Queue name different after encoding and decoding",
|
||||
info.getQueueName(), parsedInfo.getQueueName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUserNamePercentEncoding() throws IOException {
|
||||
JobIndexInfo info = new JobIndexInfo();
|
||||
JobID oldJobId = JobID.forName(JOB_ID);
|
||||
JobId jobId = TypeConverter.toYarn(oldJobId);
|
||||
|
@ -69,6 +119,7 @@ public class TestFileNameIndexUtils {
|
|||
info.setNumMaps(Integer.parseInt(NUM_MAPS));
|
||||
info.setNumReduces(Integer.parseInt(NUM_REDUCES));
|
||||
info.setJobStatus(JOB_STATUS);
|
||||
info.setQueueName(QUEUE_NAME);
|
||||
|
||||
String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info);
|
||||
Assert.assertTrue("User name not encoded correctly into job history file",
|
||||
|
@ -85,7 +136,8 @@ public class TestFileNameIndexUtils {
|
|||
FINISH_TIME,
|
||||
NUM_MAPS,
|
||||
NUM_REDUCES,
|
||||
JOB_STATUS);
|
||||
JOB_STATUS,
|
||||
QUEUE_NAME);
|
||||
|
||||
JobIndexInfo info = FileNameIndexUtils.getIndexInfo(jobHistoryFile);
|
||||
Assert.assertEquals("User name doesn't match",
|
||||
|
@ -105,6 +157,7 @@ public class TestFileNameIndexUtils {
|
|||
info.setNumMaps(Integer.parseInt(NUM_MAPS));
|
||||
info.setNumReduces(Integer.parseInt(NUM_REDUCES));
|
||||
info.setJobStatus(JOB_STATUS);
|
||||
info.setQueueName(QUEUE_NAME);
|
||||
|
||||
String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info);
|
||||
Assert.assertTrue("Job name not encoded correctly into job history file",
|
||||
|
@ -121,10 +174,52 @@ public class TestFileNameIndexUtils {
|
|||
FINISH_TIME,
|
||||
NUM_MAPS,
|
||||
NUM_REDUCES,
|
||||
JOB_STATUS);
|
||||
JOB_STATUS,
|
||||
QUEUE_NAME);
|
||||
|
||||
JobIndexInfo info = FileNameIndexUtils.getIndexInfo(jobHistoryFile);
|
||||
Assert.assertEquals("Job name doesn't match",
|
||||
JOB_NAME_WITH_DELIMITER, info.getJobName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJobHistoryFileNameBackwardsCompatible() throws IOException {
|
||||
JobID oldJobId = JobID.forName(JOB_ID);
|
||||
JobId jobId = TypeConverter.toYarn(oldJobId);
|
||||
|
||||
long submitTime = Long.parseLong(SUBMIT_TIME);
|
||||
long finishTime = Long.parseLong(FINISH_TIME);
|
||||
int numMaps = Integer.parseInt(NUM_MAPS);
|
||||
int numReduces = Integer.parseInt(NUM_REDUCES);
|
||||
|
||||
String jobHistoryFile = String.format(OLD_JOB_HISTORY_FILE_FORMATTER,
|
||||
JOB_ID,
|
||||
SUBMIT_TIME,
|
||||
USER_NAME,
|
||||
JOB_NAME,
|
||||
FINISH_TIME,
|
||||
NUM_MAPS,
|
||||
NUM_REDUCES,
|
||||
JOB_STATUS);
|
||||
|
||||
JobIndexInfo info = FileNameIndexUtils.getIndexInfo(jobHistoryFile);
|
||||
Assert.assertEquals("Job id incorrect after decoding old history file",
|
||||
jobId, info.getJobId());
|
||||
Assert.assertEquals("Submit time incorrect after decoding old history file",
|
||||
submitTime, info.getSubmitTime());
|
||||
Assert.assertEquals("User incorrect after decoding old history file",
|
||||
USER_NAME, info.getUser());
|
||||
Assert.assertEquals("Job name incorrect after decoding old history file",
|
||||
JOB_NAME, info.getJobName());
|
||||
Assert.assertEquals("Finish time incorrect after decoding old history file",
|
||||
finishTime, info.getFinishTime());
|
||||
Assert.assertEquals("Num maps incorrect after decoding old history file",
|
||||
numMaps, info.getNumMaps());
|
||||
Assert.assertEquals("Num reduces incorrect after decoding old history file",
|
||||
numReduces, info.getNumReduces());
|
||||
Assert.assertEquals("Job status incorrect after decoding old history file",
|
||||
JOB_STATUS, info.getJobStatus());
|
||||
Assert.assertNull("Queue name incorrect after decoding old history file",
|
||||
info.getQueueName());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -283,6 +283,11 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
|
|||
return jobInfo.getJobname();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getQueueName() {
|
||||
return jobInfo.getJobQueueName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTotalMaps() {
|
||||
return (int) jobInfo.getTotalMaps();
|
||||
|
|
|
@ -64,6 +64,11 @@ public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
|
|||
return jobIndexInfo.getJobName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getQueueName() {
|
||||
return jobIndexInfo.getQueueName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JobState getState() {
|
||||
JobState js = null;
|
||||
|
|
|
@ -104,6 +104,7 @@ public class HsJobBlock extends HtmlBlock {
|
|||
ResponseInfo infoBlock = info("Job Overview").
|
||||
_("Job Name:", job.getName()).
|
||||
_("User Name:", job.getUserName()).
|
||||
_("Queue:", job.getQueueName()).
|
||||
_("State:", job.getState()).
|
||||
_("Uberized:", job.isUber()).
|
||||
_("Started:", new Date(startTime)).
|
||||
|
|
|
@ -60,6 +60,7 @@ public class HsJobsBlock extends HtmlBlock {
|
|||
th(".id", "Job ID").
|
||||
th(".name", "Name").
|
||||
th("User").
|
||||
th("Queue").
|
||||
th(".state", "State").
|
||||
th("Maps Total").
|
||||
th("Maps Completed").
|
||||
|
@ -83,6 +84,7 @@ public class HsJobsBlock extends HtmlBlock {
|
|||
td().a(url("job", jobID), jobID)._().
|
||||
td(job.getName().toString()).
|
||||
td(job.getUserName()).
|
||||
td(job.getQueueName()).
|
||||
td(job.getState().toString()).
|
||||
td(mapsTotal).
|
||||
td(mapsCompleted).
|
||||
|
@ -97,6 +99,7 @@ public class HsJobsBlock extends HtmlBlock {
|
|||
th().input("search_init").$type(InputType.text).$name("start_time").$value("Job ID")._()._().
|
||||
th().input("search_init").$type(InputType.text).$name("start_time").$value("Name")._()._().
|
||||
th().input("search_init").$type(InputType.text).$name("start_time").$value("User")._()._().
|
||||
th().input("search_init").$type(InputType.text).$name("start_time").$value("Queue")._()._().
|
||||
th().input("search_init").$type(InputType.text).$name("start_time").$value("State")._()._().
|
||||
th().input("search_init").$type(InputType.text).$name("start_time").$value("Maps Total")._()._().
|
||||
th().input("search_init").$type(InputType.text).$name("start_time").$value("Maps Completed")._()._().
|
||||
|
|
Loading…
Reference in New Issue