MAPREDUCE-6652. Add configuration property to prevent JHS from loading jobs with a task count greater than X (haibochen via rkanter)

This commit is contained in:
Robert Kanter 2016-07-15 13:30:03 -07:00
parent c48e9d608c
commit 0881ed3fc3
7 changed files with 634 additions and 10 deletions

View File

@ -253,4 +253,11 @@ public class JHAdminConfig {
MR_HISTORY_PREFIX + "jhist.format"; MR_HISTORY_PREFIX + "jhist.format";
public static final String DEFAULT_MR_HS_JHIST_FORMAT = public static final String DEFAULT_MR_HS_JHIST_FORMAT =
"binary"; "binary";
/**
* The maximum number of tasks for a job to be loaded in Job History Server.
*/
public static final String MR_HS_LOADED_JOBS_TASKS_MAX =
MR_HISTORY_PREFIX + "loadedjob.tasks.max";
public static final int DEFAULT_MR_HS_LOADED_JOBS_TASKS_MAX = -1;
} }

View File

@ -1923,4 +1923,13 @@
<value>SAMEORIGIN</value> <value>SAMEORIGIN</value>
</property> </property>
<property>
<description>
The maximum number of tasks that a job can have so that the Job History
Server will fully parse its associated job history file and load it into
memory. A value of -1 (default) will allow all jobs to be loaded.
</description>
<name>mapreduce.jobhistory.loadedjob.tasks.max</name>
<value>-1</value>
</property>
</configuration> </configuration>

View File

@ -458,16 +458,24 @@ public class HistoryFileManager extends AbstractService {
/** /**
* Parse a job from the JobHistoryFile, if the underlying file is not going * Parse a job from the JobHistoryFile, if the underlying file is not going
* to be deleted. * to be deleted and the number of tasks associated with the job is not
* greater than maxTasksForLoadedJob.
* *
* @return the Job or null if the underlying file was deleted. * @return null if the underlying job history file was deleted, or
* an {@link UnparsedJob} object representing a partially parsed job
* if the job tasks exceeds the configured maximum, or
* a {@link CompletedJob} representing a fully parsed job.
* @throws IOException * @throws IOException
* if there is an error trying to read the file. * if there is an error trying to read the file if parsed.
*/ */
public synchronized Job loadJob() throws IOException { public synchronized Job loadJob() throws IOException {
if(isOversized()) {
return new UnparsedJob(maxTasksForLoadedJob, jobIndexInfo, this);
} else {
return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile, return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile,
false, jobIndexInfo.getUser(), this, aclsMgr); false, jobIndexInfo.getUser(), this, aclsMgr);
} }
}
/** /**
* Return the history file. * Return the history file.
@ -504,6 +512,12 @@ public class HistoryFileManager extends AbstractService {
jobConf.addResource(fc.open(confFile), confFile.toString()); jobConf.addResource(fc.open(confFile), confFile.toString());
return jobConf; return jobConf;
} }
private boolean isOversized() {
final int totalTasks = jobIndexInfo.getNumReduces() +
jobIndexInfo.getNumMaps();
return (maxTasksForLoadedJob > 0) && (totalTasks > maxTasksForLoadedJob);
}
} }
private SerialNumberIndex serialNumberIndex = null; private SerialNumberIndex serialNumberIndex = null;
@ -537,6 +551,11 @@ public class HistoryFileManager extends AbstractService {
protected ThreadPoolExecutor moveToDoneExecutor = null; protected ThreadPoolExecutor moveToDoneExecutor = null;
private long maxHistoryAge = 0; private long maxHistoryAge = 0;
/**
* The maximum number of tasks allowed for a job to be loaded.
*/
private int maxTasksForLoadedJob = -1;
public HistoryFileManager() { public HistoryFileManager() {
super(HistoryFileManager.class.getName()); super(HistoryFileManager.class.getName());
} }
@ -555,6 +574,10 @@ public class HistoryFileManager extends AbstractService {
JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME); JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME);
createHistoryDirs(SystemClock.getInstance(), 10 * 1000, maxFSWaitTime); createHistoryDirs(SystemClock.getInstance(), 10 * 1000, maxFSWaitTime);
maxTasksForLoadedJob = conf.getInt(
JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX,
JHAdminConfig.DEFAULT_MR_HS_LOADED_JOBS_TASKS_MAX);
this.aclsMgr = new JobACLsManager(conf); this.aclsMgr = new JobACLsManager(conf);
maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,

View File

@ -0,0 +1,211 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.*;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.util.Records;
import java.io.IOException;
import java.util.*;
/**
* A job that has too many tasks associated with it, of which we do not parse
* its job history file, to prevent the Job History Server from hanging on
* parsing the file. It is meant to be used only by JHS to indicate if the
* history file of a job is fully parsed or not.
*/
public class UnparsedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
private final JobIndexInfo jobIndexInfo;
private final int maxTasksAllowed;
private JobReport jobReport;
private final HistoryFileManager.HistoryFileInfo jhfInfo;
public UnparsedJob(int maxTasksAllowed, JobIndexInfo jobIndexInfo,
HistoryFileManager.HistoryFileInfo jhfInfo) throws IOException {
this.jobIndexInfo = jobIndexInfo;
this.jhfInfo = jhfInfo;
this.maxTasksAllowed = maxTasksAllowed;
}
public int getMaxTasksAllowed() {
return maxTasksAllowed;
}
@Override
public JobId getID() {
return jobIndexInfo.getJobId();
}
@Override
public String getName() {
return jobIndexInfo.getJobName();
}
@Override
public JobState getState() {
return JobState.valueOf(jobIndexInfo.getJobStatus());
}
@Override
public synchronized JobReport getReport() {
if(jobReport == null) {
jobReport = constructJobReport();
}
return jobReport;
}
public JobReport constructJobReport() {
JobReport report = Records.newRecord(JobReport.class);
report.setJobId(getID());
report.setJobState(getState());
report.setSubmitTime(jobIndexInfo.getSubmitTime());
report.setStartTime(jobIndexInfo.getJobStartTime());
report.setFinishTime(jobIndexInfo.getFinishTime());
report.setJobName(jobIndexInfo.getJobName());
report.setUser(jobIndexInfo.getUser());
report.setJobFile(getConfFile().toString());
report.setHistoryFile(jhfInfo.getHistoryFile().toString());
return report;
}
@Override
public Counters getAllCounters() {
return new Counters();
}
@Override
public Map<TaskId, Task> getTasks() {
return new HashMap<>();
}
@Override
public Map<TaskId, Task> getTasks(TaskType taskType) {
return new HashMap<>();
}
@Override
public Task getTask(TaskId taskID) {
return null;
}
@Override
public List<String> getDiagnostics() {
return new ArrayList<>();
}
@Override
public int getTotalMaps() {
return jobIndexInfo.getNumMaps();
}
@Override
public int getTotalReduces() {
return jobIndexInfo.getNumReduces();
}
@Override
public int getCompletedMaps() {
return -1;
}
@Override
public int getCompletedReduces() {
return -1;
}
@Override
public float getProgress() {
return 1.0f;
}
@Override
public boolean isUber() {
return false;
}
@Override
public String getUserName() {
return jobIndexInfo.getUser();
}
@Override
public String getQueueName() {
return jobIndexInfo.getQueueName();
}
@Override
public Path getConfFile() {
return jhfInfo.getConfFile();
}
@Override
public Configuration loadConfFile() throws IOException {
return jhfInfo.loadConfFile();
}
@Override
public Map<JobACL, AccessControlList> getJobACLs() {
return new HashMap<>();
}
@Override
public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
int fromEventId, int maxEvents) {
return new TaskAttemptCompletionEvent[0];
}
@Override
public TaskCompletionEvent[] getMapAttemptCompletionEvents(
int startIndex, int maxEvents) {
return new TaskCompletionEvent[0];
}
@Override
public List<AMInfo> getAMInfos() {
return new ArrayList<>();
}
@Override
public boolean checkAccess(UserGroupInformation callerUGI,
JobACL jobOperation) {
return true;
}
@Override
public void setQueueName(String queueName) {
throw new UnsupportedOperationException("Can't set job's " +
"queue name in history");
}
@Override
public void setJobPriority(Priority priority) {
throw new UnsupportedOperationException(
"Can't set job's priority in history");
}
}

View File

@ -33,8 +33,10 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ConfEntryInfo; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ConfEntryInfo;
import org.apache.hadoop.mapreduce.v2.hs.UnparsedJob;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptInfo; import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo; import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRApps.TaskAttemptStateUI; import org.apache.hadoop.mapreduce.v2.util.MRApps.TaskAttemptStateUI;
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil; import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
@ -73,8 +75,18 @@ public class HsJobBlock extends HtmlBlock {
JobId jobID = MRApps.toJobID(jid); JobId jobID = MRApps.toJobID(jid);
Job j = appContext.getJob(jobID); Job j = appContext.getJob(jobID);
if (j == null) { if (j == null) {
html. html.p()._("Sorry, ", jid, " not found.")._();
p()._("Sorry, ", jid, " not found.")._(); return;
}
if(j instanceof UnparsedJob) {
final int taskCount = j.getTotalMaps() + j.getTotalReduces();
UnparsedJob oversizedJob = (UnparsedJob) j;
html.p()._("The job has a total of " + taskCount + " tasks. ")
._("Any job larger than " + oversizedJob.getMaxTasksAllowed() +
" will not be loaded.")._();
html.p()._("You can either use the CLI tool: 'mapred job -history'"
+ " to view large jobs or adjust the property " +
JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX + ".")._();
return; return;
} }
List<AMInfo> amInfos = j.getAMInfos(); List<AMInfo> amInfos = j.getAMInfos();

View File

@ -25,6 +25,7 @@ import java.io.FileNotFoundException;
import java.util.UUID; import java.util.UUID;
import java.util.List; import java.util.List;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@ -37,11 +38,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.test.CoreTestDriver;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
@ -86,6 +85,10 @@ public class TestHistoryFileManager {
@After @After
public void cleanTest() throws Exception { public void cleanTest() throws Exception {
new File(coreSitePath).delete(); new File(coreSitePath).delete();
dfsCluster.getFileSystem().setSafeMode(
HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
dfsCluster2.getFileSystem().setSafeMode(
HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
} }
private String getDoneDirNameForTest() { private String getDoneDirNameForTest() {
@ -247,6 +250,97 @@ public class TestHistoryFileManager {
Assert.assertFalse(info.didMoveFail()); Assert.assertFalse(info.didMoveFail());
} }
@Test
public void testHistoryFileInfoLoadOversizedJobShouldReturnUnParsedJob()
throws Exception {
HistoryFileManagerTest hmTest = new HistoryFileManagerTest();
int allowedMaximumTasks = 5;
Configuration conf = dfsCluster.getConfiguration(0);
conf.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, allowedMaximumTasks);
hmTest.init(conf);
// set up a job of which the number of tasks is greater than maximum allowed
String jobId = "job_1410889000000_123456";
JobIndexInfo jobIndexInfo = new JobIndexInfo();
jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(jobId)));
jobIndexInfo.setNumMaps(allowedMaximumTasks);
jobIndexInfo.setNumReduces(allowedMaximumTasks);
HistoryFileInfo info = hmTest.getHistoryFileInfo(null, null, null,
jobIndexInfo, false);
Job job = info.loadJob();
Assert.assertTrue("Should return an instance of UnparsedJob to indicate" +
" the job history file is not parsed", job instanceof UnparsedJob);
}
@Test
public void testHistoryFileInfoLoadNormalSizedJobShouldReturnCompletedJob()
throws Exception {
HistoryFileManagerTest hmTest = new HistoryFileManagerTest();
final int numOfTasks = 100;
Configuration conf = dfsCluster.getConfiguration(0);
conf.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX,
numOfTasks + numOfTasks + 1);
hmTest.init(conf);
// set up a job of which the number of tasks is smaller than the maximum
// allowed, and therefore will be fully loaded.
final String jobId = "job_1416424547277_0002";
JobIndexInfo jobIndexInfo = new JobIndexInfo();
jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(jobId)));
jobIndexInfo.setNumMaps(numOfTasks);
jobIndexInfo.setNumReduces(numOfTasks);
final String historyFile = getClass().getClassLoader().getResource(
"job_2.0.3-alpha-FAILED.jhist").getFile();
final Path historyFilePath = FileSystem.getLocal(conf).makeQualified(
new Path(historyFile));
HistoryFileInfo info = hmTest.getHistoryFileInfo(historyFilePath, null,
null, jobIndexInfo, false);
Job job = info.loadJob();
Assert.assertTrue("Should return an instance of CompletedJob as " +
"a result of parsing the job history file of the job",
job instanceof CompletedJob);
}
@Test
public void testHistoryFileInfoShouldReturnCompletedJobIfMaxNotConfiged()
throws Exception {
HistoryFileManagerTest hmTest = new HistoryFileManagerTest();
Configuration conf = dfsCluster.getConfiguration(0);
conf.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, -1);
hmTest.init(conf);
final String jobId = "job_1416424547277_0002";
JobIndexInfo jobIndexInfo = new JobIndexInfo();
jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(jobId)));
jobIndexInfo.setNumMaps(100);
jobIndexInfo.setNumReduces(100);
final String historyFile = getClass().getClassLoader().getResource(
"job_2.0.3-alpha-FAILED.jhist").getFile();
final Path historyFilePath = FileSystem.getLocal(conf).makeQualified(
new Path(historyFile));
HistoryFileInfo info = hmTest.getHistoryFileInfo(historyFilePath, null,
null, jobIndexInfo, false);
Job job = info.loadJob();
Assert.assertTrue("Should return an instance of CompletedJob as " +
"a result of parsing the job history file of the job",
job instanceof CompletedJob);
}
static class HistoryFileManagerTest extends HistoryFileManager { static class HistoryFileManagerTest extends HistoryFileManager {
public HistoryFileManagerTest() { public HistoryFileManagerTest() {
super(); super();

View File

@ -0,0 +1,268 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs.webapp;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.*;
import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.webapp.AMParams;
import org.apache.hadoop.mapreduce.v2.hs.CompletedJob;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
import org.apache.hadoop.mapreduce.v2.hs.UnparsedJob;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.StringHelper;
import org.apache.hadoop.yarn.webapp.ResponseInfo;
import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.view.BlockForTest;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import org.apache.hadoop.yarn.webapp.view.HtmlBlockForTest;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Test the HsJobBlock generated for oversized jobs in JHS.
*/
public class TestHsJobBlock {
@Test
public void testHsJobBlockForOversizeJobShouldDisplayWarningMessage() {
int maxAllowedTaskNum = 100;
Configuration config = new Configuration();
config.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, maxAllowedTaskNum);
JobHistory jobHistory =
new JobHistoryStubWithAllOversizeJobs(maxAllowedTaskNum);
jobHistory.init(config);
HsJobBlock jobBlock = new HsJobBlock(jobHistory) {
// override this so that job block can fetch a job id.
@Override
public Map<String, String> moreParams() {
Map<String, String> map = new HashMap<>();
map.put(AMParams.JOB_ID, "job_0000_0001");
return map;
}
};
// set up the test block to render HsJobBLock to
OutputStream outputStream = new ByteArrayOutputStream();
HtmlBlock.Block block = createBlockToCreateTo(outputStream);
jobBlock.render(block);
block.getWriter().flush();
String out = outputStream.toString();
Assert.assertTrue("Should display warning message for jobs that have too " +
"many tasks", out.contains("Any job larger than " + maxAllowedTaskNum +
" will not be loaded"));
}
@Test
public void testHsJobBlockForNormalSizeJobShouldNotDisplayWarningMessage() {
Configuration config = new Configuration();
config.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, -1);
JobHistory jobHistory = new JobHitoryStubWithAllNormalSizeJobs();
jobHistory.init(config);
HsJobBlock jobBlock = new HsJobBlock(jobHistory) {
// override this so that the job block can fetch a job id.
@Override
public Map<String, String> moreParams() {
Map<String, String> map = new HashMap<>();
map.put(AMParams.JOB_ID, "job_0000_0001");
return map;
}
// override this to avoid view context lookup in render()
@Override
public ResponseInfo info(String about) {
return new ResponseInfo().about(about);
}
// override this to avoid view context lookup in render()
@Override
public String url(String... parts) {
return StringHelper.ujoin("", parts);
}
};
// set up the test block to render HsJobBLock to
OutputStream outputStream = new ByteArrayOutputStream();
HtmlBlock.Block block = createBlockToCreateTo(outputStream);
jobBlock.render(block);
block.getWriter().flush();
String out = outputStream.toString();
Assert.assertTrue("Should display job overview for the job.",
out.contains("ApplicationMaster"));
}
private static HtmlBlock.Block createBlockToCreateTo(
OutputStream outputStream) {
PrintWriter printWriter = new PrintWriter(outputStream);
HtmlBlock html = new HtmlBlockForTest();
return new BlockForTest(html, printWriter, 10, false) {
@Override
protected void subView(Class<? extends SubView> cls) {
}
};
};
/**
* A JobHistory stub that treat all jobs as oversized and therefore will
* not parse their job history files but return a UnparseJob instance.
*/
static class JobHistoryStubWithAllOversizeJobs extends JobHistory {
private final int maxAllowedTaskNum;
public JobHistoryStubWithAllOversizeJobs(int maxAllowedTaskNum) {
this.maxAllowedTaskNum = maxAllowedTaskNum;
}
@Override
protected HistoryFileManager createHistoryFileManager() {
HistoryFileManager historyFileManager;
try {
HistoryFileInfo historyFileInfo =
createUnparsedJobHistoryFileInfo(maxAllowedTaskNum);
historyFileManager = mock(HistoryFileManager.class);
when(historyFileManager.getFileInfo(any(JobId.class))).thenReturn(
historyFileInfo);
} catch (IOException ex) {
// this should never happen
historyFileManager = super.createHistoryFileManager();
}
return historyFileManager;
}
private static HistoryFileInfo createUnparsedJobHistoryFileInfo(
int maxAllowedTaskNum) throws IOException {
HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
// create an instance of UnparsedJob for a large job
UnparsedJob unparsedJob = mock(UnparsedJob.class);
when(unparsedJob.getMaxTasksAllowed()).thenReturn(maxAllowedTaskNum);
when(unparsedJob.getTotalMaps()).thenReturn(maxAllowedTaskNum);
when(unparsedJob.getTotalReduces()).thenReturn(maxAllowedTaskNum);
when(fileInfo.loadJob()).thenReturn(unparsedJob);
return fileInfo;
}
}
/**
* A JobHistory stub that treats all jobs as normal size and therefore will
* return a CompletedJob on HistoryFileInfo.loadJob().
*/
static class JobHitoryStubWithAllNormalSizeJobs extends JobHistory {
@Override
public HistoryFileManager createHistoryFileManager() {
HistoryFileManager historyFileManager;
try {
HistoryFileInfo historyFileInfo = createParsedJobHistoryFileInfo();
historyFileManager = mock(HistoryFileManager.class);
when(historyFileManager.getFileInfo(any(JobId.class))).thenReturn(
historyFileInfo);
} catch (IOException ex) {
// this should never happen
historyFileManager = super.createHistoryFileManager();
}
return historyFileManager;
}
private static HistoryFileInfo createParsedJobHistoryFileInfo()
throws IOException {
HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
CompletedJob job = createFakeCompletedJob();
when(fileInfo.loadJob()).thenReturn(job);
return fileInfo;
}
private static CompletedJob createFakeCompletedJob() {
CompletedJob job = mock(CompletedJob.class);
when(job.getTotalMaps()).thenReturn(0);
when(job.getCompletedMaps()).thenReturn(0);
when(job.getTotalReduces()).thenReturn(0);
when(job.getCompletedReduces()).thenReturn(0);
JobId jobId = createFakeJobId();
when(job.getID()).thenReturn(jobId);
JobReport jobReport = mock(JobReport.class);
when(jobReport.getSubmitTime()).thenReturn(-1L);
when(jobReport.getStartTime()).thenReturn(-1L);
when(jobReport.getFinishTime()).thenReturn(-1L);
when(job.getReport()).thenReturn(jobReport);
when(job.getAMInfos()).thenReturn(new ArrayList<AMInfo>());
when(job.getDiagnostics()).thenReturn(new ArrayList<String>());
when(job.getName()).thenReturn("fake completed job");
when(job.getQueueName()).thenReturn("default");
when(job.getUserName()).thenReturn("junit");
when(job.getState()).thenReturn(JobState.ERROR);
when(job.getAllCounters()).thenReturn(new Counters());
when(job.getTasks()).thenReturn(new HashMap<TaskId, Task>());
return job;
}
private static JobId createFakeJobId() {
JobId jobId = new JobIdPBImpl();
jobId.setId(0);
ApplicationId appId = mock(ApplicationId.class);
when(appId.getClusterTimestamp()).thenReturn(0L);
when(appId.getId()).thenReturn(0);
jobId.setAppId(appId);
return jobId;
}
}
}