From 233ffaf3f7b6dae2e26e6ba3220bd3eba9caed84 Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Fri, 15 Jul 2016 13:37:04 -0700 Subject: [PATCH] MAPREDUCE-6652. Add configuration property to prevent JHS from loading jobs with a task count greater than X (haibochen via rkanter) --- .../v2/jobhistory/JHAdminConfig.java | 7 + .../src/main/resources/mapred-default.xml | 9 + .../mapreduce/v2/hs/HistoryFileManager.java | 35 ++- .../hadoop/mapreduce/v2/hs/UnparsedJob.java | 211 ++++++++++++++ .../mapreduce/v2/hs/webapp/HsJobBlock.java | 16 +- .../v2/hs/TestHistoryFileManager.java | 98 ++++++- .../v2/hs/webapp/TestHsJobBlock.java | 268 ++++++++++++++++++ 7 files changed, 634 insertions(+), 10 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/UnparsedJob.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsJobBlock.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java index dd225e26560..644727f97b4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java @@ -253,4 +253,11 @@ public class JHAdminConfig { MR_HISTORY_PREFIX + "jhist.format"; public static final String DEFAULT_MR_HS_JHIST_FORMAT = "json"; + + /** + * The maximum number of tasks for a job to be loaded in Job History Server. + */ + public static final String MR_HS_LOADED_JOBS_TASKS_MAX = + MR_HISTORY_PREFIX + "loadedjob.tasks.max"; + public static final int DEFAULT_MR_HS_LOADED_JOBS_TASKS_MAX = -1; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index fab5b252310..a37d312b6b8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1889,4 +1889,13 @@ SAMEORIGIN + + + The maximum number of tasks that a job can have so that the Job History + Server will fully parse its associated job history file and load it into + memory. A value of -1 (default) will allow all jobs to be loaded. + + mapreduce.jobhistory.loadedjob.tasks.max + -1 + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java index 7bfb11c97b8..814afe4b93c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java @@ -458,15 +458,23 @@ synchronized void moveToDone() throws IOException { /** * Parse a job from the JobHistoryFile, if the underlying file is not going - * to be deleted. + * to be deleted and the number of tasks associated with the job is not + * greater than maxTasksForLoadedJob. * - * @return the Job or null if the underlying file was deleted. + * @return null if the underlying job history file was deleted, or + * an {@link UnparsedJob} object representing a partially parsed job + * if the job tasks exceeds the configured maximum, or + * a {@link CompletedJob} representing a fully parsed job. * @throws IOException - * if there is an error trying to read the file. + * if there is an error trying to read the file if parsed. */ public synchronized Job loadJob() throws IOException { - return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile, - false, jobIndexInfo.getUser(), this, aclsMgr); + if(isOversized()) { + return new UnparsedJob(maxTasksForLoadedJob, jobIndexInfo, this); + } else { + return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile, + false, jobIndexInfo.getUser(), this, aclsMgr); + } } /** @@ -504,6 +512,12 @@ public synchronized Configuration loadConfFile() throws IOException { jobConf.addResource(fc.open(confFile), confFile.toString()); return jobConf; } + + private boolean isOversized() { + final int totalTasks = jobIndexInfo.getNumReduces() + + jobIndexInfo.getNumMaps(); + return (maxTasksForLoadedJob > 0) && (totalTasks > maxTasksForLoadedJob); + } } private SerialNumberIndex serialNumberIndex = null; @@ -536,7 +550,12 @@ public synchronized Configuration loadConfFile() throws IOException { @VisibleForTesting protected ThreadPoolExecutor moveToDoneExecutor = null; private long maxHistoryAge = 0; - + + /** + * The maximum number of tasks allowed for a job to be loaded. + */ + private int maxTasksForLoadedJob = -1; + public HistoryFileManager() { super(HistoryFileManager.class.getName()); } @@ -555,6 +574,10 @@ protected void serviceInit(Configuration conf) throws Exception { JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME); createHistoryDirs(SystemClock.getInstance(), 10 * 1000, maxFSWaitTime); + maxTasksForLoadedJob = conf.getInt( + JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, + JHAdminConfig.DEFAULT_MR_HS_LOADED_JOBS_TASKS_MAX); + this.aclsMgr = new JobACLsManager(conf); maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/UnparsedJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/UnparsedJob.java new file mode 100644 index 00000000000..cea336c5cb0 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/UnparsedJob.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.hs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.TaskCompletionEvent; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.JobACL; +import org.apache.hadoop.mapreduce.v2.api.records.*; +import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.util.Records; + +import java.io.IOException; +import java.util.*; + +/** + * A job that has too many tasks associated with it, of which we do not parse + * its job history file, to prevent the Job History Server from hanging on + * parsing the file. It is meant to be used only by JHS to indicate if the + * history file of a job is fully parsed or not. + */ +public class UnparsedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job { + private final JobIndexInfo jobIndexInfo; + private final int maxTasksAllowed; + private JobReport jobReport; + private final HistoryFileManager.HistoryFileInfo jhfInfo; + + public UnparsedJob(int maxTasksAllowed, JobIndexInfo jobIndexInfo, + HistoryFileManager.HistoryFileInfo jhfInfo) throws IOException { + this.jobIndexInfo = jobIndexInfo; + this.jhfInfo = jhfInfo; + this.maxTasksAllowed = maxTasksAllowed; + } + + public int getMaxTasksAllowed() { + return maxTasksAllowed; + } + + @Override + public JobId getID() { + return jobIndexInfo.getJobId(); + } + + @Override + public String getName() { + return jobIndexInfo.getJobName(); + } + + @Override + public JobState getState() { + return JobState.valueOf(jobIndexInfo.getJobStatus()); + } + + @Override + public synchronized JobReport getReport() { + if(jobReport == null) { + jobReport = constructJobReport(); + } + return jobReport; + } + + public JobReport constructJobReport() { + JobReport report = Records.newRecord(JobReport.class); + report.setJobId(getID()); + report.setJobState(getState()); + report.setSubmitTime(jobIndexInfo.getSubmitTime()); + report.setStartTime(jobIndexInfo.getJobStartTime()); + report.setFinishTime(jobIndexInfo.getFinishTime()); + report.setJobName(jobIndexInfo.getJobName()); + report.setUser(jobIndexInfo.getUser()); + report.setJobFile(getConfFile().toString()); + report.setHistoryFile(jhfInfo.getHistoryFile().toString()); + return report; + } + + @Override + public Counters getAllCounters() { + return new Counters(); + } + + @Override + public Map getTasks() { + return new HashMap<>(); + } + + @Override + public Map getTasks(TaskType taskType) { + return new HashMap<>(); + } + + @Override + public Task getTask(TaskId taskID) { + return null; + } + + @Override + public List getDiagnostics() { + return new ArrayList<>(); + } + + @Override + public int getTotalMaps() { + return jobIndexInfo.getNumMaps(); + } + + @Override + public int getTotalReduces() { + return jobIndexInfo.getNumReduces(); + } + + @Override + public int getCompletedMaps() { + return -1; + } + + @Override + public int getCompletedReduces() { + return -1; + } + + @Override + public float getProgress() { + return 1.0f; + } + + @Override + public boolean isUber() { + return false; + } + + @Override + public String getUserName() { + return jobIndexInfo.getUser(); + } + + @Override + public String getQueueName() { + return jobIndexInfo.getQueueName(); + } + + @Override + public Path getConfFile() { + return jhfInfo.getConfFile(); + } + + @Override + public Configuration loadConfFile() throws IOException { + return jhfInfo.loadConfFile(); + } + + @Override + public Map getJobACLs() { + return new HashMap<>(); + } + + @Override + public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents( + int fromEventId, int maxEvents) { + return new TaskAttemptCompletionEvent[0]; + } + + @Override + public TaskCompletionEvent[] getMapAttemptCompletionEvents( + int startIndex, int maxEvents) { + return new TaskCompletionEvent[0]; + } + + @Override + public List getAMInfos() { + return new ArrayList<>(); + } + + @Override + public boolean checkAccess(UserGroupInformation callerUGI, + JobACL jobOperation) { + return true; + } + + @Override + public void setQueueName(String queueName) { + throw new UnsupportedOperationException("Can't set job's " + + "queue name in history"); + } + + @Override + public void setJobPriority(Priority priority) { + throw new UnsupportedOperationException( + "Can't set job's priority in history"); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java index 13774a8c704..0d5b03a3ac0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java @@ -33,8 +33,10 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ConfEntryInfo; +import org.apache.hadoop.mapreduce.v2.hs.UnparsedJob; import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptInfo; import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps.TaskAttemptStateUI; import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil; @@ -73,8 +75,18 @@ public class HsJobBlock extends HtmlBlock { JobId jobID = MRApps.toJobID(jid); Job j = appContext.getJob(jobID); if (j == null) { - html. - p()._("Sorry, ", jid, " not found.")._(); + html.p()._("Sorry, ", jid, " not found.")._(); + return; + } + if(j instanceof UnparsedJob) { + final int taskCount = j.getTotalMaps() + j.getTotalReduces(); + UnparsedJob oversizedJob = (UnparsedJob) j; + html.p()._("The job has a total of " + taskCount + " tasks. ") + ._("Any job larger than " + oversizedJob.getMaxTasksAllowed() + + " will not be loaded.")._(); + html.p()._("You can either use the CLI tool: 'mapred job -history'" + + " to view large jobs or adjust the property " + + JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX + ".")._(); return; } List amInfos = j.getAMInfos(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java index aa2e9796511..b7a367226e6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java @@ -25,6 +25,7 @@ import java.util.UUID; import java.util.List; +import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -37,11 +38,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.TypeConverter; -import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; -import org.apache.hadoop.test.CoreTestDriver; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.Clock; @@ -86,6 +85,10 @@ public static void cleanUpClass() throws Exception { @After public void cleanTest() throws Exception { new File(coreSitePath).delete(); + dfsCluster.getFileSystem().setSafeMode( + HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); + dfsCluster2.getFileSystem().setSafeMode( + HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); } private String getDoneDirNameForTest() { @@ -247,6 +250,97 @@ public void testHistoryFileInfoSummaryFileNotExist() throws Exception { Assert.assertFalse(info.didMoveFail()); } + @Test + public void testHistoryFileInfoLoadOversizedJobShouldReturnUnParsedJob() + throws Exception { + HistoryFileManagerTest hmTest = new HistoryFileManagerTest(); + + int allowedMaximumTasks = 5; + Configuration conf = dfsCluster.getConfiguration(0); + conf.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, allowedMaximumTasks); + + hmTest.init(conf); + + // set up a job of which the number of tasks is greater than maximum allowed + String jobId = "job_1410889000000_123456"; + JobIndexInfo jobIndexInfo = new JobIndexInfo(); + jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(jobId))); + jobIndexInfo.setNumMaps(allowedMaximumTasks); + jobIndexInfo.setNumReduces(allowedMaximumTasks); + + + HistoryFileInfo info = hmTest.getHistoryFileInfo(null, null, null, + jobIndexInfo, false); + + Job job = info.loadJob(); + Assert.assertTrue("Should return an instance of UnparsedJob to indicate" + + " the job history file is not parsed", job instanceof UnparsedJob); + } + + @Test + public void testHistoryFileInfoLoadNormalSizedJobShouldReturnCompletedJob() + throws Exception { + HistoryFileManagerTest hmTest = new HistoryFileManagerTest(); + + final int numOfTasks = 100; + Configuration conf = dfsCluster.getConfiguration(0); + conf.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, + numOfTasks + numOfTasks + 1); + + hmTest.init(conf); + + // set up a job of which the number of tasks is smaller than the maximum + // allowed, and therefore will be fully loaded. + final String jobId = "job_1416424547277_0002"; + JobIndexInfo jobIndexInfo = new JobIndexInfo(); + jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(jobId))); + jobIndexInfo.setNumMaps(numOfTasks); + jobIndexInfo.setNumReduces(numOfTasks); + + + final String historyFile = getClass().getClassLoader().getResource( + "job_2.0.3-alpha-FAILED.jhist").getFile(); + final Path historyFilePath = FileSystem.getLocal(conf).makeQualified( + new Path(historyFile)); + HistoryFileInfo info = hmTest.getHistoryFileInfo(historyFilePath, null, + null, jobIndexInfo, false); + + Job job = info.loadJob(); + Assert.assertTrue("Should return an instance of CompletedJob as " + + "a result of parsing the job history file of the job", + job instanceof CompletedJob); + } + + @Test + public void testHistoryFileInfoShouldReturnCompletedJobIfMaxNotConfiged() + throws Exception { + HistoryFileManagerTest hmTest = new HistoryFileManagerTest(); + + Configuration conf = dfsCluster.getConfiguration(0); + conf.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, -1); + + hmTest.init(conf); + + final String jobId = "job_1416424547277_0002"; + JobIndexInfo jobIndexInfo = new JobIndexInfo(); + jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(jobId))); + jobIndexInfo.setNumMaps(100); + jobIndexInfo.setNumReduces(100); + + final String historyFile = getClass().getClassLoader().getResource( + "job_2.0.3-alpha-FAILED.jhist").getFile(); + final Path historyFilePath = FileSystem.getLocal(conf).makeQualified( + new Path(historyFile)); + HistoryFileInfo info = hmTest.getHistoryFileInfo(historyFilePath, null, + null, jobIndexInfo, false); + + Job job = info.loadJob(); + Assert.assertTrue("Should return an instance of CompletedJob as " + + "a result of parsing the job history file of the job", + job instanceof CompletedJob); + + } + static class HistoryFileManagerTest extends HistoryFileManager { public HistoryFileManagerTest() { super(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsJobBlock.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsJobBlock.java new file mode 100644 index 00000000000..7fa238e1cef --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsJobBlock.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.hs.webapp; + +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.v2.api.records.*; +import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl; +import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.webapp.AMParams; +import org.apache.hadoop.mapreduce.v2.hs.CompletedJob; +import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager; +import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; +import org.apache.hadoop.mapreduce.v2.hs.JobHistory; +import org.apache.hadoop.mapreduce.v2.hs.UnparsedJob; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.StringHelper; +import org.apache.hadoop.yarn.webapp.ResponseInfo; +import org.apache.hadoop.yarn.webapp.SubView; +import org.apache.hadoop.yarn.webapp.view.BlockForTest; +import org.apache.hadoop.yarn.webapp.view.HtmlBlock; +import org.apache.hadoop.yarn.webapp.view.HtmlBlockForTest; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test the HsJobBlock generated for oversized jobs in JHS. + */ +public class TestHsJobBlock { + + @Test + public void testHsJobBlockForOversizeJobShouldDisplayWarningMessage() { + int maxAllowedTaskNum = 100; + + Configuration config = new Configuration(); + config.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, maxAllowedTaskNum); + + JobHistory jobHistory = + new JobHistoryStubWithAllOversizeJobs(maxAllowedTaskNum); + jobHistory.init(config); + + HsJobBlock jobBlock = new HsJobBlock(jobHistory) { + // override this so that job block can fetch a job id. + @Override + public Map moreParams() { + Map map = new HashMap<>(); + map.put(AMParams.JOB_ID, "job_0000_0001"); + return map; + } + }; + + // set up the test block to render HsJobBLock to + OutputStream outputStream = new ByteArrayOutputStream(); + HtmlBlock.Block block = createBlockToCreateTo(outputStream); + + jobBlock.render(block); + + block.getWriter().flush(); + String out = outputStream.toString(); + Assert.assertTrue("Should display warning message for jobs that have too " + + "many tasks", out.contains("Any job larger than " + maxAllowedTaskNum + + " will not be loaded")); + } + + @Test + public void testHsJobBlockForNormalSizeJobShouldNotDisplayWarningMessage() { + + Configuration config = new Configuration(); + config.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, -1); + + JobHistory jobHistory = new JobHitoryStubWithAllNormalSizeJobs(); + jobHistory.init(config); + + HsJobBlock jobBlock = new HsJobBlock(jobHistory) { + // override this so that the job block can fetch a job id. + @Override + public Map moreParams() { + Map map = new HashMap<>(); + map.put(AMParams.JOB_ID, "job_0000_0001"); + return map; + } + + // override this to avoid view context lookup in render() + @Override + public ResponseInfo info(String about) { + return new ResponseInfo().about(about); + } + + // override this to avoid view context lookup in render() + @Override + public String url(String... parts) { + return StringHelper.ujoin("", parts); + } + }; + + // set up the test block to render HsJobBLock to + OutputStream outputStream = new ByteArrayOutputStream(); + HtmlBlock.Block block = createBlockToCreateTo(outputStream); + + jobBlock.render(block); + + block.getWriter().flush(); + String out = outputStream.toString(); + + Assert.assertTrue("Should display job overview for the job.", + out.contains("ApplicationMaster")); + } + + private static HtmlBlock.Block createBlockToCreateTo( + OutputStream outputStream) { + PrintWriter printWriter = new PrintWriter(outputStream); + HtmlBlock html = new HtmlBlockForTest(); + return new BlockForTest(html, printWriter, 10, false) { + @Override + protected void subView(Class cls) { + } + }; + }; + + /** + * A JobHistory stub that treat all jobs as oversized and therefore will + * not parse their job history files but return a UnparseJob instance. + */ + static class JobHistoryStubWithAllOversizeJobs extends JobHistory { + private final int maxAllowedTaskNum; + + public JobHistoryStubWithAllOversizeJobs(int maxAllowedTaskNum) { + this.maxAllowedTaskNum = maxAllowedTaskNum; + } + + @Override + protected HistoryFileManager createHistoryFileManager() { + HistoryFileManager historyFileManager; + try { + HistoryFileInfo historyFileInfo = + createUnparsedJobHistoryFileInfo(maxAllowedTaskNum); + + historyFileManager = mock(HistoryFileManager.class); + when(historyFileManager.getFileInfo(any(JobId.class))).thenReturn( + historyFileInfo); + } catch (IOException ex) { + // this should never happen + historyFileManager = super.createHistoryFileManager(); + } + return historyFileManager; + } + + private static HistoryFileInfo createUnparsedJobHistoryFileInfo( + int maxAllowedTaskNum) throws IOException { + HistoryFileInfo fileInfo = mock(HistoryFileInfo.class); + + // create an instance of UnparsedJob for a large job + UnparsedJob unparsedJob = mock(UnparsedJob.class); + when(unparsedJob.getMaxTasksAllowed()).thenReturn(maxAllowedTaskNum); + when(unparsedJob.getTotalMaps()).thenReturn(maxAllowedTaskNum); + when(unparsedJob.getTotalReduces()).thenReturn(maxAllowedTaskNum); + + when(fileInfo.loadJob()).thenReturn(unparsedJob); + + return fileInfo; + } + } + + /** + * A JobHistory stub that treats all jobs as normal size and therefore will + * return a CompletedJob on HistoryFileInfo.loadJob(). + */ + static class JobHitoryStubWithAllNormalSizeJobs extends JobHistory { + @Override + public HistoryFileManager createHistoryFileManager() { + HistoryFileManager historyFileManager; + try { + HistoryFileInfo historyFileInfo = createParsedJobHistoryFileInfo(); + + historyFileManager = mock(HistoryFileManager.class); + when(historyFileManager.getFileInfo(any(JobId.class))).thenReturn( + historyFileInfo); + } catch (IOException ex) { + // this should never happen + historyFileManager = super.createHistoryFileManager(); + } + return historyFileManager; + + } + + private static HistoryFileInfo createParsedJobHistoryFileInfo() + throws IOException { + HistoryFileInfo fileInfo = mock(HistoryFileInfo.class); + CompletedJob job = createFakeCompletedJob(); + when(fileInfo.loadJob()).thenReturn(job); + return fileInfo; + } + + + private static CompletedJob createFakeCompletedJob() { + CompletedJob job = mock(CompletedJob.class); + + when(job.getTotalMaps()).thenReturn(0); + when(job.getCompletedMaps()).thenReturn(0); + when(job.getTotalReduces()).thenReturn(0); + when(job.getCompletedReduces()).thenReturn(0); + + JobId jobId = createFakeJobId(); + when(job.getID()).thenReturn(jobId); + + JobReport jobReport = mock(JobReport.class); + when(jobReport.getSubmitTime()).thenReturn(-1L); + when(jobReport.getStartTime()).thenReturn(-1L); + when(jobReport.getFinishTime()).thenReturn(-1L); + when(job.getReport()).thenReturn(jobReport); + + when(job.getAMInfos()).thenReturn(new ArrayList()); + when(job.getDiagnostics()).thenReturn(new ArrayList()); + when(job.getName()).thenReturn("fake completed job"); + when(job.getQueueName()).thenReturn("default"); + when(job.getUserName()).thenReturn("junit"); + when(job.getState()).thenReturn(JobState.ERROR); + when(job.getAllCounters()).thenReturn(new Counters()); + when(job.getTasks()).thenReturn(new HashMap()); + + return job; + } + + private static JobId createFakeJobId() { + JobId jobId = new JobIdPBImpl(); + jobId.setId(0); + + ApplicationId appId = mock(ApplicationId.class); + when(appId.getClusterTimestamp()).thenReturn(0L); + when(appId.getId()).thenReturn(0); + + jobId.setAppId(appId); + + return jobId; + } + } + +}