diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
index dd225e26560..644727f97b4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
@@ -253,4 +253,11 @@ public class JHAdminConfig {
MR_HISTORY_PREFIX + "jhist.format";
public static final String DEFAULT_MR_HS_JHIST_FORMAT =
"json";
+
+ /**
+ * The maximum number of tasks for a job to be loaded in Job History Server.
+ */
+ public static final String MR_HS_LOADED_JOBS_TASKS_MAX =
+ MR_HISTORY_PREFIX + "loadedjob.tasks.max";
+ public static final int DEFAULT_MR_HS_LOADED_JOBS_TASKS_MAX = -1;
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index fab5b252310..a37d312b6b8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1889,4 +1889,13 @@
SAMEORIGIN
+
+
+ The maximum number of tasks that a job can have so that the Job History
+ Server will fully parse its associated job history file and load it into
+ memory. A value of -1 (default) will allow all jobs to be loaded.
+
+ mapreduce.jobhistory.loadedjob.tasks.max
+ -1
+
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
index 7bfb11c97b8..814afe4b93c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
@@ -458,15 +458,23 @@ public class HistoryFileManager extends AbstractService {
/**
* Parse a job from the JobHistoryFile, if the underlying file is not going
- * to be deleted.
+ * to be deleted and the number of tasks associated with the job is not
+ * greater than maxTasksForLoadedJob.
*
- * @return the Job or null if the underlying file was deleted.
+ * @return null if the underlying job history file was deleted, or
+ * an {@link UnparsedJob} object representing a partially parsed job
+ * if the job tasks exceeds the configured maximum, or
+ * a {@link CompletedJob} representing a fully parsed job.
* @throws IOException
- * if there is an error trying to read the file.
+ * if there is an error trying to read the file if parsed.
*/
public synchronized Job loadJob() throws IOException {
- return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile,
- false, jobIndexInfo.getUser(), this, aclsMgr);
+ if(isOversized()) {
+ return new UnparsedJob(maxTasksForLoadedJob, jobIndexInfo, this);
+ } else {
+ return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile,
+ false, jobIndexInfo.getUser(), this, aclsMgr);
+ }
}
/**
@@ -504,6 +512,12 @@ public class HistoryFileManager extends AbstractService {
jobConf.addResource(fc.open(confFile), confFile.toString());
return jobConf;
}
+
+ private boolean isOversized() {
+ final int totalTasks = jobIndexInfo.getNumReduces() +
+ jobIndexInfo.getNumMaps();
+ return (maxTasksForLoadedJob > 0) && (totalTasks > maxTasksForLoadedJob);
+ }
}
private SerialNumberIndex serialNumberIndex = null;
@@ -536,7 +550,12 @@ public class HistoryFileManager extends AbstractService {
@VisibleForTesting
protected ThreadPoolExecutor moveToDoneExecutor = null;
private long maxHistoryAge = 0;
-
+
+ /**
+ * The maximum number of tasks allowed for a job to be loaded.
+ */
+ private int maxTasksForLoadedJob = -1;
+
public HistoryFileManager() {
super(HistoryFileManager.class.getName());
}
@@ -555,6 +574,10 @@ public class HistoryFileManager extends AbstractService {
JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME);
createHistoryDirs(SystemClock.getInstance(), 10 * 1000, maxFSWaitTime);
+ maxTasksForLoadedJob = conf.getInt(
+ JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX,
+ JHAdminConfig.DEFAULT_MR_HS_LOADED_JOBS_TASKS_MAX);
+
this.aclsMgr = new JobACLsManager(conf);
maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/UnparsedJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/UnparsedJob.java
new file mode 100644
index 00000000000..cea336c5cb0
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/UnparsedJob.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.v2.api.records.*;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A job that has too many tasks associated with it, of which we do not parse
+ * its job history file, to prevent the Job History Server from hanging on
+ * parsing the file. It is meant to be used only by JHS to indicate if the
+ * history file of a job is fully parsed or not.
+ */
+public class UnparsedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
+ private final JobIndexInfo jobIndexInfo;
+ private final int maxTasksAllowed;
+ private JobReport jobReport;
+ private final HistoryFileManager.HistoryFileInfo jhfInfo;
+
+ public UnparsedJob(int maxTasksAllowed, JobIndexInfo jobIndexInfo,
+ HistoryFileManager.HistoryFileInfo jhfInfo) throws IOException {
+ this.jobIndexInfo = jobIndexInfo;
+ this.jhfInfo = jhfInfo;
+ this.maxTasksAllowed = maxTasksAllowed;
+ }
+
+ public int getMaxTasksAllowed() {
+ return maxTasksAllowed;
+ }
+
+ @Override
+ public JobId getID() {
+ return jobIndexInfo.getJobId();
+ }
+
+ @Override
+ public String getName() {
+ return jobIndexInfo.getJobName();
+ }
+
+ @Override
+ public JobState getState() {
+ return JobState.valueOf(jobIndexInfo.getJobStatus());
+ }
+
+ @Override
+ public synchronized JobReport getReport() {
+ if(jobReport == null) {
+ jobReport = constructJobReport();
+ }
+ return jobReport;
+ }
+
+ public JobReport constructJobReport() {
+ JobReport report = Records.newRecord(JobReport.class);
+ report.setJobId(getID());
+ report.setJobState(getState());
+ report.setSubmitTime(jobIndexInfo.getSubmitTime());
+ report.setStartTime(jobIndexInfo.getJobStartTime());
+ report.setFinishTime(jobIndexInfo.getFinishTime());
+ report.setJobName(jobIndexInfo.getJobName());
+ report.setUser(jobIndexInfo.getUser());
+ report.setJobFile(getConfFile().toString());
+ report.setHistoryFile(jhfInfo.getHistoryFile().toString());
+ return report;
+ }
+
+ @Override
+ public Counters getAllCounters() {
+ return new Counters();
+ }
+
+ @Override
+ public Map getTasks() {
+ return new HashMap<>();
+ }
+
+ @Override
+ public Map getTasks(TaskType taskType) {
+ return new HashMap<>();
+ }
+
+ @Override
+ public Task getTask(TaskId taskID) {
+ return null;
+ }
+
+ @Override
+ public List getDiagnostics() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public int getTotalMaps() {
+ return jobIndexInfo.getNumMaps();
+ }
+
+ @Override
+ public int getTotalReduces() {
+ return jobIndexInfo.getNumReduces();
+ }
+
+ @Override
+ public int getCompletedMaps() {
+ return -1;
+ }
+
+ @Override
+ public int getCompletedReduces() {
+ return -1;
+ }
+
+ @Override
+ public float getProgress() {
+ return 1.0f;
+ }
+
+ @Override
+ public boolean isUber() {
+ return false;
+ }
+
+ @Override
+ public String getUserName() {
+ return jobIndexInfo.getUser();
+ }
+
+ @Override
+ public String getQueueName() {
+ return jobIndexInfo.getQueueName();
+ }
+
+ @Override
+ public Path getConfFile() {
+ return jhfInfo.getConfFile();
+ }
+
+ @Override
+ public Configuration loadConfFile() throws IOException {
+ return jhfInfo.loadConfFile();
+ }
+
+ @Override
+ public Map getJobACLs() {
+ return new HashMap<>();
+ }
+
+ @Override
+ public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
+ int fromEventId, int maxEvents) {
+ return new TaskAttemptCompletionEvent[0];
+ }
+
+ @Override
+ public TaskCompletionEvent[] getMapAttemptCompletionEvents(
+ int startIndex, int maxEvents) {
+ return new TaskCompletionEvent[0];
+ }
+
+ @Override
+ public List getAMInfos() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public boolean checkAccess(UserGroupInformation callerUGI,
+ JobACL jobOperation) {
+ return true;
+ }
+
+ @Override
+ public void setQueueName(String queueName) {
+ throw new UnsupportedOperationException("Can't set job's " +
+ "queue name in history");
+ }
+
+ @Override
+ public void setJobPriority(Priority priority) {
+ throw new UnsupportedOperationException(
+ "Can't set job's priority in history");
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java
index 13774a8c704..0d5b03a3ac0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java
@@ -33,8 +33,10 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ConfEntryInfo;
+import org.apache.hadoop.mapreduce.v2.hs.UnparsedJob;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRApps.TaskAttemptStateUI;
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
@@ -73,8 +75,18 @@ public class HsJobBlock extends HtmlBlock {
JobId jobID = MRApps.toJobID(jid);
Job j = appContext.getJob(jobID);
if (j == null) {
- html.
- p()._("Sorry, ", jid, " not found.")._();
+ html.p()._("Sorry, ", jid, " not found.")._();
+ return;
+ }
+ if(j instanceof UnparsedJob) {
+ final int taskCount = j.getTotalMaps() + j.getTotalReduces();
+ UnparsedJob oversizedJob = (UnparsedJob) j;
+ html.p()._("The job has a total of " + taskCount + " tasks. ")
+ ._("Any job larger than " + oversizedJob.getMaxTasksAllowed() +
+ " will not be loaded.")._();
+ html.p()._("You can either use the CLI tool: 'mapred job -history'"
+ + " to view large jobs or adjust the property " +
+ JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX + ".")._();
return;
}
List amInfos = j.getAMInfos();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
index aa2e9796511..b7a367226e6 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
@@ -25,6 +25,7 @@ import java.io.FileNotFoundException;
import java.util.UUID;
import java.util.List;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -37,11 +38,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
-import org.apache.hadoop.test.CoreTestDriver;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Clock;
@@ -86,6 +85,10 @@ public class TestHistoryFileManager {
@After
public void cleanTest() throws Exception {
new File(coreSitePath).delete();
+ dfsCluster.getFileSystem().setSafeMode(
+ HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+ dfsCluster2.getFileSystem().setSafeMode(
+ HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
}
private String getDoneDirNameForTest() {
@@ -247,6 +250,97 @@ public class TestHistoryFileManager {
Assert.assertFalse(info.didMoveFail());
}
+ @Test
+ public void testHistoryFileInfoLoadOversizedJobShouldReturnUnParsedJob()
+ throws Exception {
+ HistoryFileManagerTest hmTest = new HistoryFileManagerTest();
+
+ int allowedMaximumTasks = 5;
+ Configuration conf = dfsCluster.getConfiguration(0);
+ conf.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, allowedMaximumTasks);
+
+ hmTest.init(conf);
+
+ // set up a job of which the number of tasks is greater than maximum allowed
+ String jobId = "job_1410889000000_123456";
+ JobIndexInfo jobIndexInfo = new JobIndexInfo();
+ jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(jobId)));
+ jobIndexInfo.setNumMaps(allowedMaximumTasks);
+ jobIndexInfo.setNumReduces(allowedMaximumTasks);
+
+
+ HistoryFileInfo info = hmTest.getHistoryFileInfo(null, null, null,
+ jobIndexInfo, false);
+
+ Job job = info.loadJob();
+ Assert.assertTrue("Should return an instance of UnparsedJob to indicate" +
+ " the job history file is not parsed", job instanceof UnparsedJob);
+ }
+
+ @Test
+ public void testHistoryFileInfoLoadNormalSizedJobShouldReturnCompletedJob()
+ throws Exception {
+ HistoryFileManagerTest hmTest = new HistoryFileManagerTest();
+
+ final int numOfTasks = 100;
+ Configuration conf = dfsCluster.getConfiguration(0);
+ conf.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX,
+ numOfTasks + numOfTasks + 1);
+
+ hmTest.init(conf);
+
+ // set up a job of which the number of tasks is smaller than the maximum
+ // allowed, and therefore will be fully loaded.
+ final String jobId = "job_1416424547277_0002";
+ JobIndexInfo jobIndexInfo = new JobIndexInfo();
+ jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(jobId)));
+ jobIndexInfo.setNumMaps(numOfTasks);
+ jobIndexInfo.setNumReduces(numOfTasks);
+
+
+ final String historyFile = getClass().getClassLoader().getResource(
+ "job_2.0.3-alpha-FAILED.jhist").getFile();
+ final Path historyFilePath = FileSystem.getLocal(conf).makeQualified(
+ new Path(historyFile));
+ HistoryFileInfo info = hmTest.getHistoryFileInfo(historyFilePath, null,
+ null, jobIndexInfo, false);
+
+ Job job = info.loadJob();
+ Assert.assertTrue("Should return an instance of CompletedJob as " +
+ "a result of parsing the job history file of the job",
+ job instanceof CompletedJob);
+ }
+
+ @Test
+ public void testHistoryFileInfoShouldReturnCompletedJobIfMaxNotConfiged()
+ throws Exception {
+ HistoryFileManagerTest hmTest = new HistoryFileManagerTest();
+
+ Configuration conf = dfsCluster.getConfiguration(0);
+ conf.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, -1);
+
+ hmTest.init(conf);
+
+ final String jobId = "job_1416424547277_0002";
+ JobIndexInfo jobIndexInfo = new JobIndexInfo();
+ jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(jobId)));
+ jobIndexInfo.setNumMaps(100);
+ jobIndexInfo.setNumReduces(100);
+
+ final String historyFile = getClass().getClassLoader().getResource(
+ "job_2.0.3-alpha-FAILED.jhist").getFile();
+ final Path historyFilePath = FileSystem.getLocal(conf).makeQualified(
+ new Path(historyFile));
+ HistoryFileInfo info = hmTest.getHistoryFileInfo(historyFilePath, null,
+ null, jobIndexInfo, false);
+
+ Job job = info.loadJob();
+ Assert.assertTrue("Should return an instance of CompletedJob as " +
+ "a result of parsing the job history file of the job",
+ job instanceof CompletedJob);
+
+ }
+
static class HistoryFileManagerTest extends HistoryFileManager {
public HistoryFileManagerTest() {
super();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsJobBlock.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsJobBlock.java
new file mode 100644
index 00000000000..7fa238e1cef
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsJobBlock.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.hs.webapp;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.*;
+import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.webapp.AMParams;
+import org.apache.hadoop.mapreduce.v2.hs.CompletedJob;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
+import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
+import org.apache.hadoop.mapreduce.v2.hs.UnparsedJob;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.StringHelper;
+import org.apache.hadoop.yarn.webapp.ResponseInfo;
+import org.apache.hadoop.yarn.webapp.SubView;
+import org.apache.hadoop.yarn.webapp.view.BlockForTest;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlockForTest;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test the HsJobBlock generated for oversized jobs in JHS.
+ */
+public class TestHsJobBlock {
+
+ @Test
+ public void testHsJobBlockForOversizeJobShouldDisplayWarningMessage() {
+ int maxAllowedTaskNum = 100;
+
+ Configuration config = new Configuration();
+ config.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, maxAllowedTaskNum);
+
+ JobHistory jobHistory =
+ new JobHistoryStubWithAllOversizeJobs(maxAllowedTaskNum);
+ jobHistory.init(config);
+
+ HsJobBlock jobBlock = new HsJobBlock(jobHistory) {
+ // override this so that job block can fetch a job id.
+ @Override
+ public Map moreParams() {
+ Map map = new HashMap<>();
+ map.put(AMParams.JOB_ID, "job_0000_0001");
+ return map;
+ }
+ };
+
+ // set up the test block to render HsJobBLock to
+ OutputStream outputStream = new ByteArrayOutputStream();
+ HtmlBlock.Block block = createBlockToCreateTo(outputStream);
+
+ jobBlock.render(block);
+
+ block.getWriter().flush();
+ String out = outputStream.toString();
+ Assert.assertTrue("Should display warning message for jobs that have too " +
+ "many tasks", out.contains("Any job larger than " + maxAllowedTaskNum +
+ " will not be loaded"));
+ }
+
+ @Test
+ public void testHsJobBlockForNormalSizeJobShouldNotDisplayWarningMessage() {
+
+ Configuration config = new Configuration();
+ config.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, -1);
+
+ JobHistory jobHistory = new JobHitoryStubWithAllNormalSizeJobs();
+ jobHistory.init(config);
+
+ HsJobBlock jobBlock = new HsJobBlock(jobHistory) {
+ // override this so that the job block can fetch a job id.
+ @Override
+ public Map moreParams() {
+ Map map = new HashMap<>();
+ map.put(AMParams.JOB_ID, "job_0000_0001");
+ return map;
+ }
+
+ // override this to avoid view context lookup in render()
+ @Override
+ public ResponseInfo info(String about) {
+ return new ResponseInfo().about(about);
+ }
+
+ // override this to avoid view context lookup in render()
+ @Override
+ public String url(String... parts) {
+ return StringHelper.ujoin("", parts);
+ }
+ };
+
+ // set up the test block to render HsJobBLock to
+ OutputStream outputStream = new ByteArrayOutputStream();
+ HtmlBlock.Block block = createBlockToCreateTo(outputStream);
+
+ jobBlock.render(block);
+
+ block.getWriter().flush();
+ String out = outputStream.toString();
+
+ Assert.assertTrue("Should display job overview for the job.",
+ out.contains("ApplicationMaster"));
+ }
+
+ private static HtmlBlock.Block createBlockToCreateTo(
+ OutputStream outputStream) {
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ HtmlBlock html = new HtmlBlockForTest();
+ return new BlockForTest(html, printWriter, 10, false) {
+ @Override
+ protected void subView(Class extends SubView> cls) {
+ }
+ };
+ };
+
+ /**
+ * A JobHistory stub that treat all jobs as oversized and therefore will
+ * not parse their job history files but return a UnparseJob instance.
+ */
+ static class JobHistoryStubWithAllOversizeJobs extends JobHistory {
+ private final int maxAllowedTaskNum;
+
+ public JobHistoryStubWithAllOversizeJobs(int maxAllowedTaskNum) {
+ this.maxAllowedTaskNum = maxAllowedTaskNum;
+ }
+
+ @Override
+ protected HistoryFileManager createHistoryFileManager() {
+ HistoryFileManager historyFileManager;
+ try {
+ HistoryFileInfo historyFileInfo =
+ createUnparsedJobHistoryFileInfo(maxAllowedTaskNum);
+
+ historyFileManager = mock(HistoryFileManager.class);
+ when(historyFileManager.getFileInfo(any(JobId.class))).thenReturn(
+ historyFileInfo);
+ } catch (IOException ex) {
+ // this should never happen
+ historyFileManager = super.createHistoryFileManager();
+ }
+ return historyFileManager;
+ }
+
+ private static HistoryFileInfo createUnparsedJobHistoryFileInfo(
+ int maxAllowedTaskNum) throws IOException {
+ HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
+
+ // create an instance of UnparsedJob for a large job
+ UnparsedJob unparsedJob = mock(UnparsedJob.class);
+ when(unparsedJob.getMaxTasksAllowed()).thenReturn(maxAllowedTaskNum);
+ when(unparsedJob.getTotalMaps()).thenReturn(maxAllowedTaskNum);
+ when(unparsedJob.getTotalReduces()).thenReturn(maxAllowedTaskNum);
+
+ when(fileInfo.loadJob()).thenReturn(unparsedJob);
+
+ return fileInfo;
+ }
+ }
+
+ /**
+ * A JobHistory stub that treats all jobs as normal size and therefore will
+ * return a CompletedJob on HistoryFileInfo.loadJob().
+ */
+ static class JobHitoryStubWithAllNormalSizeJobs extends JobHistory {
+ @Override
+ public HistoryFileManager createHistoryFileManager() {
+ HistoryFileManager historyFileManager;
+ try {
+ HistoryFileInfo historyFileInfo = createParsedJobHistoryFileInfo();
+
+ historyFileManager = mock(HistoryFileManager.class);
+ when(historyFileManager.getFileInfo(any(JobId.class))).thenReturn(
+ historyFileInfo);
+ } catch (IOException ex) {
+ // this should never happen
+ historyFileManager = super.createHistoryFileManager();
+ }
+ return historyFileManager;
+
+ }
+
+ private static HistoryFileInfo createParsedJobHistoryFileInfo()
+ throws IOException {
+ HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
+ CompletedJob job = createFakeCompletedJob();
+ when(fileInfo.loadJob()).thenReturn(job);
+ return fileInfo;
+ }
+
+
+ private static CompletedJob createFakeCompletedJob() {
+ CompletedJob job = mock(CompletedJob.class);
+
+ when(job.getTotalMaps()).thenReturn(0);
+ when(job.getCompletedMaps()).thenReturn(0);
+ when(job.getTotalReduces()).thenReturn(0);
+ when(job.getCompletedReduces()).thenReturn(0);
+
+ JobId jobId = createFakeJobId();
+ when(job.getID()).thenReturn(jobId);
+
+ JobReport jobReport = mock(JobReport.class);
+ when(jobReport.getSubmitTime()).thenReturn(-1L);
+ when(jobReport.getStartTime()).thenReturn(-1L);
+ when(jobReport.getFinishTime()).thenReturn(-1L);
+ when(job.getReport()).thenReturn(jobReport);
+
+ when(job.getAMInfos()).thenReturn(new ArrayList());
+ when(job.getDiagnostics()).thenReturn(new ArrayList());
+ when(job.getName()).thenReturn("fake completed job");
+ when(job.getQueueName()).thenReturn("default");
+ when(job.getUserName()).thenReturn("junit");
+ when(job.getState()).thenReturn(JobState.ERROR);
+ when(job.getAllCounters()).thenReturn(new Counters());
+ when(job.getTasks()).thenReturn(new HashMap());
+
+ return job;
+ }
+
+ private static JobId createFakeJobId() {
+ JobId jobId = new JobIdPBImpl();
+ jobId.setId(0);
+
+ ApplicationId appId = mock(ApplicationId.class);
+ when(appId.getClusterTimestamp()).thenReturn(0L);
+ when(appId.getId()).thenReturn(0);
+
+ jobId.setAppId(appId);
+
+ return jobId;
+ }
+ }
+
+}