MAPREDUCE-3342. Fixed JobHistoryServer to also show the job's queue name. Contributed by Jonathan Eagles.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1199133 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-11-08 07:28:56 +00:00
parent 6749d1395f
commit 9fe9f42c8f
13 changed files with 204 additions and 22 deletions

View File

@ -86,6 +86,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3344. o.a.h.mapreduce.Reducer since 0.21 blindly casts to
ReduceContext.ValueIterator. (Brock Noland via tomwhite)
MAPREDUCE-3342. Fixed JobHistoryServer to also show the job's queue
name. (Jonathan Eagles via vinodkv)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -400,6 +400,7 @@ public class JobHistoryEventHandler extends AbstractService
JobSubmittedEvent jobSubmittedEvent =
(JobSubmittedEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime());
mi.getJobIndexInfo().setQueueName(jobSubmittedEvent.getJobQueueName());
}
// If this is JobFinishedEvent, close the writer and setup the job-index

View File

@ -55,6 +55,7 @@ public interface Job {
int getCompletedReduces();
boolean isUber();
String getUserName();
String getQueueName();
/**
* @return a path to where the config file for this job is located.

View File

@ -142,6 +142,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private final EventHandler eventHandler;
private final MRAppMetrics metrics;
private final String userName;
private final String queueName;
private final long appSubmitTime;
private boolean lazyTasksCopyNeeded = false;
@ -375,6 +376,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
this.amInfos = amInfos;
this.userName = userName;
this.queueName = conf.get(MRJobConfig.QUEUE_NAME, "default");
this.appSubmitTime = appSubmitTime;
this.oldJobId = TypeConverter.fromYarn(jobId);
this.newApiCommitter = newApiCommitter;
@ -766,6 +768,11 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
return userName;
}
@Override
public String getQueueName() {
return queueName;
}
/*
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.v2.app.job.Job#getConfFile()
@ -829,7 +836,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
job.conf.get(MRJobConfig.USER_NAME, "mapred"),
job.appSubmitTime,
job.remoteJobConfFile.toString(),
job.jobACLs, job.conf.get(MRJobConfig.QUEUE_NAME, "default"));
job.jobACLs, job.queueName);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
//TODO JH Verify jobACLs, UserName via UGI?

View File

@ -492,6 +492,11 @@ public class MockJobs extends MockApps {
return "mock";
}
@Override
public String getQueueName() {
return "mockqueue";
}
@Override
public Path getConfFile() {
throw new UnsupportedOperationException("Not supported yet.");

View File

@ -439,6 +439,11 @@ public class TestRuntimeEstimators {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public String getQueueName() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int getTotalMaps() {
return mapTasks.size();

View File

@ -23,6 +23,9 @@ import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -35,6 +38,10 @@ public class FileNameIndexUtils {
static final String DELIMITER = "-";
static final String DELIMITER_ESCAPE = "%2D";
private static final Log LOG = LogFactory.getLog(FileNameIndexUtils.class);
// Job history file names need to be backwards compatible
// Only append new elements to the end of this list
private static final int JOB_ID_INDEX = 0;
private static final int SUBMIT_TIME_INDEX = 1;
private static final int USER_INDEX = 2;
@ -43,7 +50,7 @@ public class FileNameIndexUtils {
private static final int NUM_MAPS_INDEX = 5;
private static final int NUM_REDUCES_INDEX = 6;
private static final int JOB_STATUS_INDEX = 7;
private static final int MAX_INDEX = JOB_STATUS_INDEX;
private static final int QUEUE_NAME_INDEX = 8;
/**
* Constructs the job history file name from the JobIndexInfo.
@ -83,7 +90,11 @@ public class FileNameIndexUtils {
//JobStatus
sb.append(indexInfo.getJobStatus());
sb.append(DELIMITER);
//QueueName
sb.append(indexInfo.getQueueName());
sb.append(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION);
return encodeJobHistoryFileName(sb.toString());
}
@ -100,27 +111,60 @@ public class FileNameIndexUtils {
JobIndexInfo indexInfo = new JobIndexInfo();
String[] jobDetails = fileName.split(DELIMITER);
if (jobDetails.length != MAX_INDEX +1) {
throw new IOException("Failed to parse file: [" + jhFileName + "]. Expected " + (MAX_INDEX + 1) + "parts.");
}
JobID oldJobId = JobID.forName(decodeJobHistoryFileName(jobDetails[JOB_ID_INDEX]));
JobId jobId = TypeConverter.toYarn(oldJobId);
indexInfo.setJobId(jobId);
//TODO Catch NumberFormatException - Do not fail if there's only a few fields missing.
indexInfo.setSubmitTime(Long.parseLong(decodeJobHistoryFileName(jobDetails[SUBMIT_TIME_INDEX])));
indexInfo.setUser(decodeJobHistoryFileName(jobDetails[USER_INDEX]));
indexInfo.setJobName(decodeJobHistoryFileName(jobDetails[JOB_NAME_INDEX]));
indexInfo.setFinishTime(Long.parseLong(decodeJobHistoryFileName(jobDetails[FINISH_TIME_INDEX])));
indexInfo.setNumMaps(Integer.parseInt(decodeJobHistoryFileName(jobDetails[NUM_MAPS_INDEX])));
indexInfo.setNumReduces(Integer.parseInt(decodeJobHistoryFileName(jobDetails[NUM_REDUCES_INDEX])));
indexInfo.setJobStatus(decodeJobHistoryFileName(jobDetails[JOB_STATUS_INDEX]));
// Do not fail if there are some minor parse errors
try {
try {
indexInfo.setSubmitTime(
Long.parseLong(decodeJobHistoryFileName(jobDetails[SUBMIT_TIME_INDEX])));
} catch (NumberFormatException e) {
LOG.warn("Unable to parse submit time from job history file "
+ jhFileName + " : " + e);
}
indexInfo.setUser(
decodeJobHistoryFileName(jobDetails[USER_INDEX]));
indexInfo.setJobName(
decodeJobHistoryFileName(jobDetails[JOB_NAME_INDEX]));
try {
indexInfo.setFinishTime(
Long.parseLong(decodeJobHistoryFileName(jobDetails[FINISH_TIME_INDEX])));
} catch (NumberFormatException e) {
LOG.warn("Unable to parse finish time from job history file "
+ jhFileName + " : " + e);
}
try {
indexInfo.setNumMaps(
Integer.parseInt(decodeJobHistoryFileName(jobDetails[NUM_MAPS_INDEX])));
} catch (NumberFormatException e) {
LOG.warn("Unable to parse num maps from job history file "
+ jhFileName + " : " + e);
}
try {
indexInfo.setNumReduces(
Integer.parseInt(decodeJobHistoryFileName(jobDetails[NUM_REDUCES_INDEX])));
} catch (NumberFormatException e) {
LOG.warn("Unable to parse num reduces from job history file "
+ jhFileName + " : " + e);
}
indexInfo.setJobStatus(
decodeJobHistoryFileName(jobDetails[JOB_STATUS_INDEX]));
indexInfo.setQueueName(
decodeJobHistoryFileName(jobDetails[QUEUE_NAME_INDEX]));
} catch (IndexOutOfBoundsException e) {
LOG.warn("Parsing job history file with partial data encoded into name: "
+ jhFileName);
}
return indexInfo;
}

View File

@ -28,6 +28,7 @@ public class JobIndexInfo {
private long submitTime;
private long finishTime;
private String user;
private String queueName;
private String jobName;
private JobId jobId;
private int numMaps;
@ -67,6 +68,12 @@ public class JobIndexInfo {
public void setUser(String user) {
this.user = user;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getJobName() {
return jobName;
}

View File

@ -29,6 +29,16 @@ import org.junit.Test;
public class TestFileNameIndexUtils {
private static final String OLD_JOB_HISTORY_FILE_FORMATTER = "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
private static final String JOB_HISTORY_FILE_FORMATTER = "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
@ -37,6 +47,7 @@ public class TestFileNameIndexUtils {
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
private static final String JOB_ID = "job_1317928501754_0001";
@ -55,9 +66,48 @@ public class TestFileNameIndexUtils {
private static final String NUM_MAPS = "1";
private static final String NUM_REDUCES = "1";
private static final String JOB_STATUS = "SUCCEEDED";
private static final String QUEUE_NAME = "default";
@Test
public void testUserNamePercentEncoding() throws IOException{
public void testEncodingDecodingEquivalence() throws IOException {
JobIndexInfo info = new JobIndexInfo();
JobID oldJobId = JobID.forName(JOB_ID);
JobId jobId = TypeConverter.toYarn(oldJobId);
info.setJobId(jobId);
info.setSubmitTime(Long.parseLong(SUBMIT_TIME));
info.setUser(USER_NAME);
info.setJobName(JOB_NAME);
info.setFinishTime(Long.parseLong(FINISH_TIME));
info.setNumMaps(Integer.parseInt(NUM_MAPS));
info.setNumReduces(Integer.parseInt(NUM_REDUCES));
info.setJobStatus(JOB_STATUS);
info.setQueueName(QUEUE_NAME);
String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info);
JobIndexInfo parsedInfo = FileNameIndexUtils.getIndexInfo(jobHistoryFile);
Assert.assertEquals("Job id different after encoding and decoding",
info.getJobId(), parsedInfo.getJobId());
Assert.assertEquals("Submit time different after encoding and decoding",
info.getSubmitTime(), parsedInfo.getSubmitTime());
Assert.assertEquals("User different after encoding and decoding",
info.getUser(), parsedInfo.getUser());
Assert.assertEquals("Job name different after encoding and decoding",
info.getJobName(), parsedInfo.getJobName());
Assert.assertEquals("Finish time different after encoding and decoding",
info.getFinishTime(), parsedInfo.getFinishTime());
Assert.assertEquals("Num maps different after encoding and decoding",
info.getNumMaps(), parsedInfo.getNumMaps());
Assert.assertEquals("Num reduces different after encoding and decoding",
info.getNumReduces(), parsedInfo.getNumReduces());
Assert.assertEquals("Job status different after encoding and decoding",
info.getJobStatus(), parsedInfo.getJobStatus());
Assert.assertEquals("Queue name different after encoding and decoding",
info.getQueueName(), parsedInfo.getQueueName());
}
@Test
public void testUserNamePercentEncoding() throws IOException {
JobIndexInfo info = new JobIndexInfo();
JobID oldJobId = JobID.forName(JOB_ID);
JobId jobId = TypeConverter.toYarn(oldJobId);
@ -69,6 +119,7 @@ public class TestFileNameIndexUtils {
info.setNumMaps(Integer.parseInt(NUM_MAPS));
info.setNumReduces(Integer.parseInt(NUM_REDUCES));
info.setJobStatus(JOB_STATUS);
info.setQueueName(QUEUE_NAME);
String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info);
Assert.assertTrue("User name not encoded correctly into job history file",
@ -85,7 +136,8 @@ public class TestFileNameIndexUtils {
FINISH_TIME,
NUM_MAPS,
NUM_REDUCES,
JOB_STATUS);
JOB_STATUS,
QUEUE_NAME);
JobIndexInfo info = FileNameIndexUtils.getIndexInfo(jobHistoryFile);
Assert.assertEquals("User name doesn't match",
@ -105,6 +157,7 @@ public class TestFileNameIndexUtils {
info.setNumMaps(Integer.parseInt(NUM_MAPS));
info.setNumReduces(Integer.parseInt(NUM_REDUCES));
info.setJobStatus(JOB_STATUS);
info.setQueueName(QUEUE_NAME);
String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info);
Assert.assertTrue("Job name not encoded correctly into job history file",
@ -121,10 +174,52 @@ public class TestFileNameIndexUtils {
FINISH_TIME,
NUM_MAPS,
NUM_REDUCES,
JOB_STATUS);
JOB_STATUS,
QUEUE_NAME);
JobIndexInfo info = FileNameIndexUtils.getIndexInfo(jobHistoryFile);
Assert.assertEquals("Job name doesn't match",
JOB_NAME_WITH_DELIMITER, info.getJobName());
}
@Test
public void testJobHistoryFileNameBackwardsCompatible() throws IOException {
JobID oldJobId = JobID.forName(JOB_ID);
JobId jobId = TypeConverter.toYarn(oldJobId);
long submitTime = Long.parseLong(SUBMIT_TIME);
long finishTime = Long.parseLong(FINISH_TIME);
int numMaps = Integer.parseInt(NUM_MAPS);
int numReduces = Integer.parseInt(NUM_REDUCES);
String jobHistoryFile = String.format(OLD_JOB_HISTORY_FILE_FORMATTER,
JOB_ID,
SUBMIT_TIME,
USER_NAME,
JOB_NAME,
FINISH_TIME,
NUM_MAPS,
NUM_REDUCES,
JOB_STATUS);
JobIndexInfo info = FileNameIndexUtils.getIndexInfo(jobHistoryFile);
Assert.assertEquals("Job id incorrect after decoding old history file",
jobId, info.getJobId());
Assert.assertEquals("Submit time incorrect after decoding old history file",
submitTime, info.getSubmitTime());
Assert.assertEquals("User incorrect after decoding old history file",
USER_NAME, info.getUser());
Assert.assertEquals("Job name incorrect after decoding old history file",
JOB_NAME, info.getJobName());
Assert.assertEquals("Finish time incorrect after decoding old history file",
finishTime, info.getFinishTime());
Assert.assertEquals("Num maps incorrect after decoding old history file",
numMaps, info.getNumMaps());
Assert.assertEquals("Num reduces incorrect after decoding old history file",
numReduces, info.getNumReduces());
Assert.assertEquals("Job status incorrect after decoding old history file",
JOB_STATUS, info.getJobStatus());
Assert.assertNull("Queue name incorrect after decoding old history file",
info.getQueueName());
}
}

View File

@ -283,6 +283,11 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
return jobInfo.getJobname();
}
@Override
public String getQueueName() {
return jobInfo.getJobQueueName();
}
@Override
public int getTotalMaps() {
return (int) jobInfo.getTotalMaps();

View File

@ -64,6 +64,11 @@ public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
return jobIndexInfo.getJobName();
}
@Override
public String getQueueName() {
return jobIndexInfo.getQueueName();
}
@Override
public JobState getState() {
JobState js = null;

View File

@ -104,6 +104,7 @@ public class HsJobBlock extends HtmlBlock {
ResponseInfo infoBlock = info("Job Overview").
_("Job Name:", job.getName()).
_("User Name:", job.getUserName()).
_("Queue:", job.getQueueName()).
_("State:", job.getState()).
_("Uberized:", job.isUber()).
_("Started:", new Date(startTime)).

View File

@ -60,6 +60,7 @@ public class HsJobsBlock extends HtmlBlock {
th(".id", "Job ID").
th(".name", "Name").
th("User").
th("Queue").
th(".state", "State").
th("Maps Total").
th("Maps Completed").
@ -83,6 +84,7 @@ public class HsJobsBlock extends HtmlBlock {
td().a(url("job", jobID), jobID)._().
td(job.getName().toString()).
td(job.getUserName()).
td(job.getQueueName()).
td(job.getState().toString()).
td(mapsTotal).
td(mapsCompleted).
@ -97,6 +99,7 @@ public class HsJobsBlock extends HtmlBlock {
th().input("search_init").$type(InputType.text).$name("start_time").$value("Job ID")._()._().
th().input("search_init").$type(InputType.text).$name("start_time").$value("Name")._()._().
th().input("search_init").$type(InputType.text).$name("start_time").$value("User")._()._().
th().input("search_init").$type(InputType.text).$name("start_time").$value("Queue")._()._().
th().input("search_init").$type(InputType.text).$name("start_time").$value("State")._()._().
th().input("search_init").$type(InputType.text).$name("start_time").$value("Maps Total")._()._().
th().input("search_init").$type(InputType.text).$name("start_time").$value("Maps Completed")._()._().