MAPREDUCE-6652. Add configuration property to prevent JHS from loading jobs with a task count greater than X (haibochen via rkanter)
This commit is contained in:
parent
6171498375
commit
233ffaf3f7
|
@ -253,4 +253,11 @@ public class JHAdminConfig {
|
||||||
MR_HISTORY_PREFIX + "jhist.format";
|
MR_HISTORY_PREFIX + "jhist.format";
|
||||||
public static final String DEFAULT_MR_HS_JHIST_FORMAT =
|
public static final String DEFAULT_MR_HS_JHIST_FORMAT =
|
||||||
"json";
|
"json";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The maximum number of tasks for a job to be loaded in Job History Server.
|
||||||
|
*/
|
||||||
|
public static final String MR_HS_LOADED_JOBS_TASKS_MAX =
|
||||||
|
MR_HISTORY_PREFIX + "loadedjob.tasks.max";
|
||||||
|
public static final int DEFAULT_MR_HS_LOADED_JOBS_TASKS_MAX = -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1889,4 +1889,13 @@
|
||||||
<value>SAMEORIGIN</value>
|
<value>SAMEORIGIN</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>
|
||||||
|
The maximum number of tasks that a job can have so that the Job History
|
||||||
|
Server will fully parse its associated job history file and load it into
|
||||||
|
memory. A value of -1 (default) will allow all jobs to be loaded.
|
||||||
|
</description>
|
||||||
|
<name>mapreduce.jobhistory.loadedjob.tasks.max</name>
|
||||||
|
<value>-1</value>
|
||||||
|
</property>
|
||||||
</configuration>
|
</configuration>
|
||||||
|
|
|
@ -458,15 +458,23 @@ public class HistoryFileManager extends AbstractService {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parse a job from the JobHistoryFile, if the underlying file is not going
|
* Parse a job from the JobHistoryFile, if the underlying file is not going
|
||||||
* to be deleted.
|
* to be deleted and the number of tasks associated with the job is not
|
||||||
|
* greater than maxTasksForLoadedJob.
|
||||||
*
|
*
|
||||||
* @return the Job or null if the underlying file was deleted.
|
* @return null if the underlying job history file was deleted, or
|
||||||
|
* an {@link UnparsedJob} object representing a partially parsed job
|
||||||
|
* if the job tasks exceeds the configured maximum, or
|
||||||
|
* a {@link CompletedJob} representing a fully parsed job.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
* if there is an error trying to read the file.
|
* if there is an error trying to read the file if parsed.
|
||||||
*/
|
*/
|
||||||
public synchronized Job loadJob() throws IOException {
|
public synchronized Job loadJob() throws IOException {
|
||||||
return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile,
|
if(isOversized()) {
|
||||||
false, jobIndexInfo.getUser(), this, aclsMgr);
|
return new UnparsedJob(maxTasksForLoadedJob, jobIndexInfo, this);
|
||||||
|
} else {
|
||||||
|
return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile,
|
||||||
|
false, jobIndexInfo.getUser(), this, aclsMgr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -504,6 +512,12 @@ public class HistoryFileManager extends AbstractService {
|
||||||
jobConf.addResource(fc.open(confFile), confFile.toString());
|
jobConf.addResource(fc.open(confFile), confFile.toString());
|
||||||
return jobConf;
|
return jobConf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isOversized() {
|
||||||
|
final int totalTasks = jobIndexInfo.getNumReduces() +
|
||||||
|
jobIndexInfo.getNumMaps();
|
||||||
|
return (maxTasksForLoadedJob > 0) && (totalTasks > maxTasksForLoadedJob);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private SerialNumberIndex serialNumberIndex = null;
|
private SerialNumberIndex serialNumberIndex = null;
|
||||||
|
@ -536,7 +550,12 @@ public class HistoryFileManager extends AbstractService {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected ThreadPoolExecutor moveToDoneExecutor = null;
|
protected ThreadPoolExecutor moveToDoneExecutor = null;
|
||||||
private long maxHistoryAge = 0;
|
private long maxHistoryAge = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The maximum number of tasks allowed for a job to be loaded.
|
||||||
|
*/
|
||||||
|
private int maxTasksForLoadedJob = -1;
|
||||||
|
|
||||||
public HistoryFileManager() {
|
public HistoryFileManager() {
|
||||||
super(HistoryFileManager.class.getName());
|
super(HistoryFileManager.class.getName());
|
||||||
}
|
}
|
||||||
|
@ -555,6 +574,10 @@ public class HistoryFileManager extends AbstractService {
|
||||||
JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME);
|
JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME);
|
||||||
createHistoryDirs(SystemClock.getInstance(), 10 * 1000, maxFSWaitTime);
|
createHistoryDirs(SystemClock.getInstance(), 10 * 1000, maxFSWaitTime);
|
||||||
|
|
||||||
|
maxTasksForLoadedJob = conf.getInt(
|
||||||
|
JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX,
|
||||||
|
JHAdminConfig.DEFAULT_MR_HS_LOADED_JOBS_TASKS_MAX);
|
||||||
|
|
||||||
this.aclsMgr = new JobACLsManager(conf);
|
this.aclsMgr = new JobACLsManager(conf);
|
||||||
|
|
||||||
maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
|
maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
|
||||||
|
|
|
@ -0,0 +1,211 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.v2.hs;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.mapred.TaskCompletionEvent;
|
||||||
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
|
import org.apache.hadoop.mapreduce.JobACL;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.*;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A job that has too many tasks associated with it, of which we do not parse
|
||||||
|
* its job history file, to prevent the Job History Server from hanging on
|
||||||
|
* parsing the file. It is meant to be used only by JHS to indicate if the
|
||||||
|
* history file of a job is fully parsed or not.
|
||||||
|
*/
|
||||||
|
public class UnparsedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
|
||||||
|
private final JobIndexInfo jobIndexInfo;
|
||||||
|
private final int maxTasksAllowed;
|
||||||
|
private JobReport jobReport;
|
||||||
|
private final HistoryFileManager.HistoryFileInfo jhfInfo;
|
||||||
|
|
||||||
|
public UnparsedJob(int maxTasksAllowed, JobIndexInfo jobIndexInfo,
|
||||||
|
HistoryFileManager.HistoryFileInfo jhfInfo) throws IOException {
|
||||||
|
this.jobIndexInfo = jobIndexInfo;
|
||||||
|
this.jhfInfo = jhfInfo;
|
||||||
|
this.maxTasksAllowed = maxTasksAllowed;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxTasksAllowed() {
|
||||||
|
return maxTasksAllowed;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JobId getID() {
|
||||||
|
return jobIndexInfo.getJobId();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return jobIndexInfo.getJobName();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JobState getState() {
|
||||||
|
return JobState.valueOf(jobIndexInfo.getJobStatus());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized JobReport getReport() {
|
||||||
|
if(jobReport == null) {
|
||||||
|
jobReport = constructJobReport();
|
||||||
|
}
|
||||||
|
return jobReport;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JobReport constructJobReport() {
|
||||||
|
JobReport report = Records.newRecord(JobReport.class);
|
||||||
|
report.setJobId(getID());
|
||||||
|
report.setJobState(getState());
|
||||||
|
report.setSubmitTime(jobIndexInfo.getSubmitTime());
|
||||||
|
report.setStartTime(jobIndexInfo.getJobStartTime());
|
||||||
|
report.setFinishTime(jobIndexInfo.getFinishTime());
|
||||||
|
report.setJobName(jobIndexInfo.getJobName());
|
||||||
|
report.setUser(jobIndexInfo.getUser());
|
||||||
|
report.setJobFile(getConfFile().toString());
|
||||||
|
report.setHistoryFile(jhfInfo.getHistoryFile().toString());
|
||||||
|
return report;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Counters getAllCounters() {
|
||||||
|
return new Counters();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<TaskId, Task> getTasks() {
|
||||||
|
return new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<TaskId, Task> getTasks(TaskType taskType) {
|
||||||
|
return new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Task getTask(TaskId taskID) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> getDiagnostics() {
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getTotalMaps() {
|
||||||
|
return jobIndexInfo.getNumMaps();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getTotalReduces() {
|
||||||
|
return jobIndexInfo.getNumReduces();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getCompletedMaps() {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getCompletedReduces() {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public float getProgress() {
|
||||||
|
return 1.0f;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isUber() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getUserName() {
|
||||||
|
return jobIndexInfo.getUser();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getQueueName() {
|
||||||
|
return jobIndexInfo.getQueueName();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Path getConfFile() {
|
||||||
|
return jhfInfo.getConfFile();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration loadConfFile() throws IOException {
|
||||||
|
return jhfInfo.loadConfFile();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<JobACL, AccessControlList> getJobACLs() {
|
||||||
|
return new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
|
||||||
|
int fromEventId, int maxEvents) {
|
||||||
|
return new TaskAttemptCompletionEvent[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TaskCompletionEvent[] getMapAttemptCompletionEvents(
|
||||||
|
int startIndex, int maxEvents) {
|
||||||
|
return new TaskCompletionEvent[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<AMInfo> getAMInfos() {
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean checkAccess(UserGroupInformation callerUGI,
|
||||||
|
JobACL jobOperation) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setQueueName(String queueName) {
|
||||||
|
throw new UnsupportedOperationException("Can't set job's " +
|
||||||
|
"queue name in history");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setJobPriority(Priority priority) {
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"Can't set job's priority in history");
|
||||||
|
}
|
||||||
|
}
|
|
@ -33,8 +33,10 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ConfEntryInfo;
|
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ConfEntryInfo;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.hs.UnparsedJob;
|
||||||
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptInfo;
|
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptInfo;
|
||||||
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo;
|
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRApps.TaskAttemptStateUI;
|
import org.apache.hadoop.mapreduce.v2.util.MRApps.TaskAttemptStateUI;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
|
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
|
||||||
|
@ -73,8 +75,18 @@ public class HsJobBlock extends HtmlBlock {
|
||||||
JobId jobID = MRApps.toJobID(jid);
|
JobId jobID = MRApps.toJobID(jid);
|
||||||
Job j = appContext.getJob(jobID);
|
Job j = appContext.getJob(jobID);
|
||||||
if (j == null) {
|
if (j == null) {
|
||||||
html.
|
html.p()._("Sorry, ", jid, " not found.")._();
|
||||||
p()._("Sorry, ", jid, " not found.")._();
|
return;
|
||||||
|
}
|
||||||
|
if(j instanceof UnparsedJob) {
|
||||||
|
final int taskCount = j.getTotalMaps() + j.getTotalReduces();
|
||||||
|
UnparsedJob oversizedJob = (UnparsedJob) j;
|
||||||
|
html.p()._("The job has a total of " + taskCount + " tasks. ")
|
||||||
|
._("Any job larger than " + oversizedJob.getMaxTasksAllowed() +
|
||||||
|
" will not be loaded.")._();
|
||||||
|
html.p()._("You can either use the CLI tool: 'mapred job -history'"
|
||||||
|
+ " to view large jobs or adjust the property " +
|
||||||
|
JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX + ".")._();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
List<AMInfo> amInfos = j.getAMInfos();
|
List<AMInfo> amInfos = j.getAMInfos();
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.io.FileNotFoundException;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
@ -37,11 +38,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.mapreduce.JobID;
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
||||||
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
|
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
|
||||||
import org.apache.hadoop.test.CoreTestDriver;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
|
@ -86,6 +85,10 @@ public class TestHistoryFileManager {
|
||||||
@After
|
@After
|
||||||
public void cleanTest() throws Exception {
|
public void cleanTest() throws Exception {
|
||||||
new File(coreSitePath).delete();
|
new File(coreSitePath).delete();
|
||||||
|
dfsCluster.getFileSystem().setSafeMode(
|
||||||
|
HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
|
||||||
|
dfsCluster2.getFileSystem().setSafeMode(
|
||||||
|
HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getDoneDirNameForTest() {
|
private String getDoneDirNameForTest() {
|
||||||
|
@ -247,6 +250,97 @@ public class TestHistoryFileManager {
|
||||||
Assert.assertFalse(info.didMoveFail());
|
Assert.assertFalse(info.didMoveFail());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHistoryFileInfoLoadOversizedJobShouldReturnUnParsedJob()
|
||||||
|
throws Exception {
|
||||||
|
HistoryFileManagerTest hmTest = new HistoryFileManagerTest();
|
||||||
|
|
||||||
|
int allowedMaximumTasks = 5;
|
||||||
|
Configuration conf = dfsCluster.getConfiguration(0);
|
||||||
|
conf.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, allowedMaximumTasks);
|
||||||
|
|
||||||
|
hmTest.init(conf);
|
||||||
|
|
||||||
|
// set up a job of which the number of tasks is greater than maximum allowed
|
||||||
|
String jobId = "job_1410889000000_123456";
|
||||||
|
JobIndexInfo jobIndexInfo = new JobIndexInfo();
|
||||||
|
jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(jobId)));
|
||||||
|
jobIndexInfo.setNumMaps(allowedMaximumTasks);
|
||||||
|
jobIndexInfo.setNumReduces(allowedMaximumTasks);
|
||||||
|
|
||||||
|
|
||||||
|
HistoryFileInfo info = hmTest.getHistoryFileInfo(null, null, null,
|
||||||
|
jobIndexInfo, false);
|
||||||
|
|
||||||
|
Job job = info.loadJob();
|
||||||
|
Assert.assertTrue("Should return an instance of UnparsedJob to indicate" +
|
||||||
|
" the job history file is not parsed", job instanceof UnparsedJob);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHistoryFileInfoLoadNormalSizedJobShouldReturnCompletedJob()
|
||||||
|
throws Exception {
|
||||||
|
HistoryFileManagerTest hmTest = new HistoryFileManagerTest();
|
||||||
|
|
||||||
|
final int numOfTasks = 100;
|
||||||
|
Configuration conf = dfsCluster.getConfiguration(0);
|
||||||
|
conf.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX,
|
||||||
|
numOfTasks + numOfTasks + 1);
|
||||||
|
|
||||||
|
hmTest.init(conf);
|
||||||
|
|
||||||
|
// set up a job of which the number of tasks is smaller than the maximum
|
||||||
|
// allowed, and therefore will be fully loaded.
|
||||||
|
final String jobId = "job_1416424547277_0002";
|
||||||
|
JobIndexInfo jobIndexInfo = new JobIndexInfo();
|
||||||
|
jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(jobId)));
|
||||||
|
jobIndexInfo.setNumMaps(numOfTasks);
|
||||||
|
jobIndexInfo.setNumReduces(numOfTasks);
|
||||||
|
|
||||||
|
|
||||||
|
final String historyFile = getClass().getClassLoader().getResource(
|
||||||
|
"job_2.0.3-alpha-FAILED.jhist").getFile();
|
||||||
|
final Path historyFilePath = FileSystem.getLocal(conf).makeQualified(
|
||||||
|
new Path(historyFile));
|
||||||
|
HistoryFileInfo info = hmTest.getHistoryFileInfo(historyFilePath, null,
|
||||||
|
null, jobIndexInfo, false);
|
||||||
|
|
||||||
|
Job job = info.loadJob();
|
||||||
|
Assert.assertTrue("Should return an instance of CompletedJob as " +
|
||||||
|
"a result of parsing the job history file of the job",
|
||||||
|
job instanceof CompletedJob);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHistoryFileInfoShouldReturnCompletedJobIfMaxNotConfiged()
|
||||||
|
throws Exception {
|
||||||
|
HistoryFileManagerTest hmTest = new HistoryFileManagerTest();
|
||||||
|
|
||||||
|
Configuration conf = dfsCluster.getConfiguration(0);
|
||||||
|
conf.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, -1);
|
||||||
|
|
||||||
|
hmTest.init(conf);
|
||||||
|
|
||||||
|
final String jobId = "job_1416424547277_0002";
|
||||||
|
JobIndexInfo jobIndexInfo = new JobIndexInfo();
|
||||||
|
jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(jobId)));
|
||||||
|
jobIndexInfo.setNumMaps(100);
|
||||||
|
jobIndexInfo.setNumReduces(100);
|
||||||
|
|
||||||
|
final String historyFile = getClass().getClassLoader().getResource(
|
||||||
|
"job_2.0.3-alpha-FAILED.jhist").getFile();
|
||||||
|
final Path historyFilePath = FileSystem.getLocal(conf).makeQualified(
|
||||||
|
new Path(historyFile));
|
||||||
|
HistoryFileInfo info = hmTest.getHistoryFileInfo(historyFilePath, null,
|
||||||
|
null, jobIndexInfo, false);
|
||||||
|
|
||||||
|
Job job = info.loadJob();
|
||||||
|
Assert.assertTrue("Should return an instance of CompletedJob as " +
|
||||||
|
"a result of parsing the job history file of the job",
|
||||||
|
job instanceof CompletedJob);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
static class HistoryFileManagerTest extends HistoryFileManager {
|
static class HistoryFileManagerTest extends HistoryFileManager {
|
||||||
public HistoryFileManagerTest() {
|
public HistoryFileManagerTest() {
|
||||||
super();
|
super();
|
||||||
|
|
|
@ -0,0 +1,268 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.v2.hs.webapp;
|
||||||
|
|
||||||
|
import org.apache.commons.io.output.ByteArrayOutputStream;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.*;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.webapp.AMParams;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.hs.CompletedJob;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.hs.UnparsedJob;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.util.StringHelper;
|
||||||
|
import org.apache.hadoop.yarn.webapp.ResponseInfo;
|
||||||
|
import org.apache.hadoop.yarn.webapp.SubView;
|
||||||
|
import org.apache.hadoop.yarn.webapp.view.BlockForTest;
|
||||||
|
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
|
||||||
|
import org.apache.hadoop.yarn.webapp.view.HtmlBlockForTest;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.io.PrintWriter;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the HsJobBlock generated for oversized jobs in JHS.
|
||||||
|
*/
|
||||||
|
public class TestHsJobBlock {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHsJobBlockForOversizeJobShouldDisplayWarningMessage() {
|
||||||
|
int maxAllowedTaskNum = 100;
|
||||||
|
|
||||||
|
Configuration config = new Configuration();
|
||||||
|
config.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, maxAllowedTaskNum);
|
||||||
|
|
||||||
|
JobHistory jobHistory =
|
||||||
|
new JobHistoryStubWithAllOversizeJobs(maxAllowedTaskNum);
|
||||||
|
jobHistory.init(config);
|
||||||
|
|
||||||
|
HsJobBlock jobBlock = new HsJobBlock(jobHistory) {
|
||||||
|
// override this so that job block can fetch a job id.
|
||||||
|
@Override
|
||||||
|
public Map<String, String> moreParams() {
|
||||||
|
Map<String, String> map = new HashMap<>();
|
||||||
|
map.put(AMParams.JOB_ID, "job_0000_0001");
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// set up the test block to render HsJobBLock to
|
||||||
|
OutputStream outputStream = new ByteArrayOutputStream();
|
||||||
|
HtmlBlock.Block block = createBlockToCreateTo(outputStream);
|
||||||
|
|
||||||
|
jobBlock.render(block);
|
||||||
|
|
||||||
|
block.getWriter().flush();
|
||||||
|
String out = outputStream.toString();
|
||||||
|
Assert.assertTrue("Should display warning message for jobs that have too " +
|
||||||
|
"many tasks", out.contains("Any job larger than " + maxAllowedTaskNum +
|
||||||
|
" will not be loaded"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHsJobBlockForNormalSizeJobShouldNotDisplayWarningMessage() {
|
||||||
|
|
||||||
|
Configuration config = new Configuration();
|
||||||
|
config.setInt(JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, -1);
|
||||||
|
|
||||||
|
JobHistory jobHistory = new JobHitoryStubWithAllNormalSizeJobs();
|
||||||
|
jobHistory.init(config);
|
||||||
|
|
||||||
|
HsJobBlock jobBlock = new HsJobBlock(jobHistory) {
|
||||||
|
// override this so that the job block can fetch a job id.
|
||||||
|
@Override
|
||||||
|
public Map<String, String> moreParams() {
|
||||||
|
Map<String, String> map = new HashMap<>();
|
||||||
|
map.put(AMParams.JOB_ID, "job_0000_0001");
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
|
// override this to avoid view context lookup in render()
|
||||||
|
@Override
|
||||||
|
public ResponseInfo info(String about) {
|
||||||
|
return new ResponseInfo().about(about);
|
||||||
|
}
|
||||||
|
|
||||||
|
// override this to avoid view context lookup in render()
|
||||||
|
@Override
|
||||||
|
public String url(String... parts) {
|
||||||
|
return StringHelper.ujoin("", parts);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// set up the test block to render HsJobBLock to
|
||||||
|
OutputStream outputStream = new ByteArrayOutputStream();
|
||||||
|
HtmlBlock.Block block = createBlockToCreateTo(outputStream);
|
||||||
|
|
||||||
|
jobBlock.render(block);
|
||||||
|
|
||||||
|
block.getWriter().flush();
|
||||||
|
String out = outputStream.toString();
|
||||||
|
|
||||||
|
Assert.assertTrue("Should display job overview for the job.",
|
||||||
|
out.contains("ApplicationMaster"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static HtmlBlock.Block createBlockToCreateTo(
|
||||||
|
OutputStream outputStream) {
|
||||||
|
PrintWriter printWriter = new PrintWriter(outputStream);
|
||||||
|
HtmlBlock html = new HtmlBlockForTest();
|
||||||
|
return new BlockForTest(html, printWriter, 10, false) {
|
||||||
|
@Override
|
||||||
|
protected void subView(Class<? extends SubView> cls) {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A JobHistory stub that treat all jobs as oversized and therefore will
|
||||||
|
* not parse their job history files but return a UnparseJob instance.
|
||||||
|
*/
|
||||||
|
static class JobHistoryStubWithAllOversizeJobs extends JobHistory {
|
||||||
|
private final int maxAllowedTaskNum;
|
||||||
|
|
||||||
|
public JobHistoryStubWithAllOversizeJobs(int maxAllowedTaskNum) {
|
||||||
|
this.maxAllowedTaskNum = maxAllowedTaskNum;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected HistoryFileManager createHistoryFileManager() {
|
||||||
|
HistoryFileManager historyFileManager;
|
||||||
|
try {
|
||||||
|
HistoryFileInfo historyFileInfo =
|
||||||
|
createUnparsedJobHistoryFileInfo(maxAllowedTaskNum);
|
||||||
|
|
||||||
|
historyFileManager = mock(HistoryFileManager.class);
|
||||||
|
when(historyFileManager.getFileInfo(any(JobId.class))).thenReturn(
|
||||||
|
historyFileInfo);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
// this should never happen
|
||||||
|
historyFileManager = super.createHistoryFileManager();
|
||||||
|
}
|
||||||
|
return historyFileManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static HistoryFileInfo createUnparsedJobHistoryFileInfo(
|
||||||
|
int maxAllowedTaskNum) throws IOException {
|
||||||
|
HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
|
||||||
|
|
||||||
|
// create an instance of UnparsedJob for a large job
|
||||||
|
UnparsedJob unparsedJob = mock(UnparsedJob.class);
|
||||||
|
when(unparsedJob.getMaxTasksAllowed()).thenReturn(maxAllowedTaskNum);
|
||||||
|
when(unparsedJob.getTotalMaps()).thenReturn(maxAllowedTaskNum);
|
||||||
|
when(unparsedJob.getTotalReduces()).thenReturn(maxAllowedTaskNum);
|
||||||
|
|
||||||
|
when(fileInfo.loadJob()).thenReturn(unparsedJob);
|
||||||
|
|
||||||
|
return fileInfo;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A JobHistory stub that treats all jobs as normal size and therefore will
|
||||||
|
* return a CompletedJob on HistoryFileInfo.loadJob().
|
||||||
|
*/
|
||||||
|
static class JobHitoryStubWithAllNormalSizeJobs extends JobHistory {
|
||||||
|
@Override
|
||||||
|
public HistoryFileManager createHistoryFileManager() {
|
||||||
|
HistoryFileManager historyFileManager;
|
||||||
|
try {
|
||||||
|
HistoryFileInfo historyFileInfo = createParsedJobHistoryFileInfo();
|
||||||
|
|
||||||
|
historyFileManager = mock(HistoryFileManager.class);
|
||||||
|
when(historyFileManager.getFileInfo(any(JobId.class))).thenReturn(
|
||||||
|
historyFileInfo);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
// this should never happen
|
||||||
|
historyFileManager = super.createHistoryFileManager();
|
||||||
|
}
|
||||||
|
return historyFileManager;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static HistoryFileInfo createParsedJobHistoryFileInfo()
|
||||||
|
throws IOException {
|
||||||
|
HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
|
||||||
|
CompletedJob job = createFakeCompletedJob();
|
||||||
|
when(fileInfo.loadJob()).thenReturn(job);
|
||||||
|
return fileInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static CompletedJob createFakeCompletedJob() {
|
||||||
|
CompletedJob job = mock(CompletedJob.class);
|
||||||
|
|
||||||
|
when(job.getTotalMaps()).thenReturn(0);
|
||||||
|
when(job.getCompletedMaps()).thenReturn(0);
|
||||||
|
when(job.getTotalReduces()).thenReturn(0);
|
||||||
|
when(job.getCompletedReduces()).thenReturn(0);
|
||||||
|
|
||||||
|
JobId jobId = createFakeJobId();
|
||||||
|
when(job.getID()).thenReturn(jobId);
|
||||||
|
|
||||||
|
JobReport jobReport = mock(JobReport.class);
|
||||||
|
when(jobReport.getSubmitTime()).thenReturn(-1L);
|
||||||
|
when(jobReport.getStartTime()).thenReturn(-1L);
|
||||||
|
when(jobReport.getFinishTime()).thenReturn(-1L);
|
||||||
|
when(job.getReport()).thenReturn(jobReport);
|
||||||
|
|
||||||
|
when(job.getAMInfos()).thenReturn(new ArrayList<AMInfo>());
|
||||||
|
when(job.getDiagnostics()).thenReturn(new ArrayList<String>());
|
||||||
|
when(job.getName()).thenReturn("fake completed job");
|
||||||
|
when(job.getQueueName()).thenReturn("default");
|
||||||
|
when(job.getUserName()).thenReturn("junit");
|
||||||
|
when(job.getState()).thenReturn(JobState.ERROR);
|
||||||
|
when(job.getAllCounters()).thenReturn(new Counters());
|
||||||
|
when(job.getTasks()).thenReturn(new HashMap<TaskId, Task>());
|
||||||
|
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static JobId createFakeJobId() {
|
||||||
|
JobId jobId = new JobIdPBImpl();
|
||||||
|
jobId.setId(0);
|
||||||
|
|
||||||
|
ApplicationId appId = mock(ApplicationId.class);
|
||||||
|
when(appId.getClusterTimestamp()).thenReturn(0L);
|
||||||
|
when(appId.getId()).thenReturn(0);
|
||||||
|
|
||||||
|
jobId.setAppId(appId);
|
||||||
|
|
||||||
|
return jobId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue