merge -r 1311895:1311896 from trunk. FIXES: MAPREDUCE-4059

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1311897 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2012-04-10 18:13:09 +00:00
parent a2c86e8e3d
commit ce9bdceac7
18 changed files with 1399 additions and 966 deletions

View File

@ -136,6 +136,9 @@ Release 0.23.3 - UNRELEASED
IMPROVEMENTS
MAPREDUCE-4059. The history server should have a separate pluggable
storage/query interface. (Robert Evans via tgraves)
OPTIMIZATIONS
BUG FIXES

View File

@ -44,6 +44,9 @@ public class JHAdminConfig {
/** Run the History Cleaner every X ms.*/
public static final String MR_HISTORY_CLEANER_INTERVAL_MS =
MR_HISTORY_PREFIX + "cleaner.interval-ms";
public static final long DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS =
1 * 24 * 60 * 60 * 1000l; //1 day
/** The number of threads to handle client API requests.*/
public static final String MR_HISTORY_CLIENT_THREAD_COUNT =
@ -56,7 +59,9 @@ public class JHAdminConfig {
*/
public static final String MR_HISTORY_DATESTRING_CACHE_SIZE =
MR_HISTORY_PREFIX + "datestring.cache.size";
public static final int DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE = 200000;
//TODO REMOVE debug-mode
/** Equivalent to 0.20 mapreduce.jobhistory.debug.mode */
public static final String MR_HISTORY_DEBUG_MODE =
MR_HISTORY_PREFIX + "debug-mode";
@ -75,6 +80,7 @@ public class JHAdminConfig {
/** Size of the job list cache.*/
public static final String MR_HISTORY_JOBLIST_CACHE_SIZE =
MR_HISTORY_PREFIX + "joblist.cache.size";
public static final int DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE = 20000;
/** The location of the Kerberos keytab file.*/
public static final String MR_HISTORY_KEYTAB = MR_HISTORY_PREFIX + "keytab";
@ -82,6 +88,7 @@ public class JHAdminConfig {
/** Size of the loaded job cache.*/
public static final String MR_HISTORY_LOADED_JOB_CACHE_SIZE =
MR_HISTORY_PREFIX + "loadedjobs.cache.size";
public static final int DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE = 5;
/**
* The maximum age of a job history file before it is deleted from the history
@ -89,6 +96,8 @@ public class JHAdminConfig {
*/
public static final String MR_HISTORY_MAX_AGE_MS =
MR_HISTORY_PREFIX + "max-age-ms";
public static final long DEFAULT_MR_HISTORY_MAX_AGE =
7 * 24 * 60 * 60 * 1000L; //1 week
/**
* Scan for history files to more from intermediate done dir to done dir
@ -96,10 +105,13 @@ public class JHAdminConfig {
*/
public static final String MR_HISTORY_MOVE_INTERVAL_MS =
MR_HISTORY_PREFIX + "move.interval-ms";
public static final long DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS =
3 * 60 * 1000l; //3 minutes
/** The number of threads used to move files.*/
public static final String MR_HISTORY_MOVE_THREAD_COUNT =
MR_HISTORY_PREFIX + "move.thread-count";
public static final int DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT = 3;
/** The Kerberos principal for the history server.*/
public static final String MR_HISTORY_PRINCIPAL =
@ -116,4 +128,10 @@ public class JHAdminConfig {
*/
public static final String MR_HS_SECURITY_SERVICE_AUTHORIZATION =
"security.mrhs.client.protocol.acl";
/**
* The HistoryStorage class to use to cache history data.
*/
public static final String MR_HISTORY_STORAGE =
MR_HISTORY_PREFIX + ".store.class";
}

View File

@ -31,6 +31,8 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
@ -50,6 +52,8 @@
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class JobHistoryUtils {
/**

View File

@ -0,0 +1,217 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.MetaInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.AbstractService;
/**
* Manages an in memory cache of parsed Job History files.
*/
public class CachedHistoryStorage extends AbstractService implements
HistoryStorage {
private static final Log LOG = LogFactory.getLog(CachedHistoryStorage.class);
private Map<JobId, Job> loadedJobCache = null;
// The number of loaded jobs.
private int loadedJobCacheSize;
private HistoryFileManager hsManager;
@Override
public void setHistoryFileManager(HistoryFileManager hsManager) {
this.hsManager = hsManager;
}
@SuppressWarnings("serial")
@Override
public void init(Configuration conf) throws YarnException {
LOG.info("CachedHistoryStorage Init");
loadedJobCacheSize = conf.getInt(
JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
JHAdminConfig.DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE);
loadedJobCache = Collections.synchronizedMap(new LinkedHashMap<JobId, Job>(
loadedJobCacheSize + 1, 0.75f, true) {
@Override
public boolean removeEldestEntry(final Map.Entry<JobId, Job> eldest) {
return super.size() > loadedJobCacheSize;
}
});
super.init(conf);
}
public CachedHistoryStorage() {
super(CachedHistoryStorage.class.getName());
}
private Job loadJob(MetaInfo metaInfo) {
try {
Job job = hsManager.loadJob(metaInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + job.getID() + " to loaded job cache");
}
loadedJobCache.put(job.getID(), job);
return job;
} catch (IOException e) {
throw new YarnException(
"Could not find/load job: " + metaInfo.getJobId(), e);
}
}
@Override
public synchronized Job getFullJob(JobId jobId) {
if (LOG.isDebugEnabled()) {
LOG.debug("Looking for Job " + jobId);
}
try {
Job result = loadedJobCache.get(jobId);
if (result == null) {
MetaInfo metaInfo = hsManager.getMetaInfo(jobId);
if (metaInfo != null) {
result = loadJob(metaInfo);
}
}
return result;
} catch (IOException e) {
throw new YarnException(e);
}
}
@Override
public Map<JobId, Job> getAllPartialJobs() {
LOG.debug("Called getAllPartialJobs()");
SortedMap<JobId, Job> result = new TreeMap<JobId, Job>();
try {
for (MetaInfo mi : hsManager.getAllMetaInfo()) {
if (mi != null) {
JobId id = mi.getJobId();
result.put(id, new PartialJob(mi.getJobIndexInfo(), id));
}
}
} catch (IOException e) {
LOG.warn("Error trying to scan for all MetaInfos", e);
throw new YarnException(e);
}
return result;
}
@Override
public void jobRemovedFromHDFS(JobId jobId) {
loadedJobCache.remove(jobId);
}
@Override
public JobsInfo getPartialJobs(Long offset, Long count, String user,
String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
JobState jobState) {
return getPartialJobs(getAllPartialJobs().values(), offset, count, user,
queue, sBegin, sEnd, fBegin, fEnd, jobState);
}
public static JobsInfo getPartialJobs(Collection<Job> jobs, Long offset,
Long count, String user, String queue, Long sBegin, Long sEnd,
Long fBegin, Long fEnd, JobState jobState) {
JobsInfo allJobs = new JobsInfo();
if (sBegin == null || sBegin < 0)
sBegin = 0l;
if (sEnd == null)
sEnd = Long.MAX_VALUE;
if (fBegin == null || fBegin < 0)
fBegin = 0l;
if (fEnd == null)
fEnd = Long.MAX_VALUE;
if (offset == null || offset < 0)
offset = 0l;
if (count == null)
count = Long.MAX_VALUE;
if (offset > jobs.size()) {
return allJobs;
}
long at = 0;
long end = offset + count - 1;
if (end < 0) { // due to overflow
end = Long.MAX_VALUE;
}
for (Job job : jobs) {
if (at > end) {
break;
}
// can't really validate queue is a valid one since queues could change
if (queue != null && !queue.isEmpty()) {
if (!job.getQueueName().equals(queue)) {
continue;
}
}
if (user != null && !user.isEmpty()) {
if (!job.getUserName().equals(user)) {
continue;
}
}
JobReport report = job.getReport();
if (report.getStartTime() < sBegin || report.getStartTime() > sEnd) {
continue;
}
if (report.getFinishTime() < fBegin || report.getFinishTime() > fEnd) {
continue;
}
if (jobState != null && jobState != report.getJobState()) {
continue;
}
at++;
if ((at - 1) < offset) {
continue;
}
JobInfo jobInfo = new JobInfo(job);
allJobs.add(jobInfo);
}
return allJobs;
}
}

View File

@ -24,8 +24,13 @@
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
public interface HistoryContext extends AppContext {
Map<JobId, Job> getAllJobs(ApplicationId appID);
JobsInfo getPartialJobs(Long offset, Long count, String user,
String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, JobState jobState);
}

View File

@ -0,0 +1,763 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.AbstractService;
/**
* This class provides a way to interact with history files in a thread safe
* manor.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class HistoryFileManager extends AbstractService {
private static final Log LOG = LogFactory.getLog(HistoryFileManager.class);
private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
.doneSubdirsBeforeSerialTail();
public static class MetaInfo {
private Path historyFile;
private Path confFile;
private Path summaryFile;
private JobIndexInfo jobIndexInfo;
public MetaInfo(Path historyFile, Path confFile, Path summaryFile,
JobIndexInfo jobIndexInfo) {
this.historyFile = historyFile;
this.confFile = confFile;
this.summaryFile = summaryFile;
this.jobIndexInfo = jobIndexInfo;
}
private Path getHistoryFile() {
return historyFile;
}
private Path getConfFile() {
return confFile;
}
private Path getSummaryFile() {
return summaryFile;
}
public JobIndexInfo getJobIndexInfo() {
return jobIndexInfo;
}
public JobId getJobId() {
return jobIndexInfo.getJobId();
}
private void setHistoryFile(Path historyFile) {
this.historyFile = historyFile;
}
private void setConfFile(Path confFile) {
this.confFile = confFile;
}
private void setSummaryFile(Path summaryFile) {
this.summaryFile = summaryFile;
}
}
/**
* Maps between a serial number (generated based on jobId) and the timestamp
* component(s) to which it belongs. Facilitates jobId based searches. If a
* jobId is not found in this list - it will not be found.
*/
private final SortedMap<String, Set<String>> idToDateString =
new TreeMap<String, Set<String>>();
// The number of entries in idToDateString
private int dateStringCacheSize;
// Maintains minimal details for recent jobs (parsed from history file name).
// Sorted on Job Completion Time.
private final SortedMap<JobId, MetaInfo> jobListCache =
new ConcurrentSkipListMap<JobId, MetaInfo>();
// The number of jobs to maintain in the job list cache.
private int jobListCacheSize;
// Re-use existing MetaInfo objects if they exist for the specific JobId.
// (synchronization on MetaInfo)
// Check for existence of the object when using iterators.
private final SortedMap<JobId, MetaInfo> intermediateListCache =
new ConcurrentSkipListMap<JobId, MetaInfo>();
// Maintains a list of known done subdirectories.
private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
/**
* Maintains a mapping between intermediate user directories and the last
* known modification time.
*/
private Map<String, Long> userDirModificationTimeMap =
new HashMap<String, Long>();
private JobACLsManager aclsMgr;
private Configuration conf;
// TODO Remove me!!!!
private boolean debugMode;
private String serialNumberFormat;
private Path doneDirPrefixPath = null; // folder for completed jobs
private FileContext doneDirFc; // done Dir FileContext
private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path
private FileContext intermediateDoneDirFc; // Intermediate Done Dir
// FileContext
public HistoryFileManager() {
super(HistoryFileManager.class.getName());
}
@Override
public void init(Configuration conf) {
this.conf = conf;
debugMode = conf.getBoolean(JHAdminConfig.MR_HISTORY_DEBUG_MODE, false);
int serialNumberLowDigits = debugMode ? 1 : 3;
serialNumberFormat = ("%0"
+ (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits)
+ "d");
String doneDirPrefix = null;
doneDirPrefix = JobHistoryUtils
.getConfiguredHistoryServerDoneDirPrefix(conf);
try {
doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
new Path(doneDirPrefix));
doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
} catch (IOException e) {
throw new YarnException("Error creating done directory: ["
+ doneDirPrefixPath + "]", e);
}
String intermediateDoneDirPrefix = null;
intermediateDoneDirPrefix = JobHistoryUtils
.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
try {
intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified(
new Path(intermediateDoneDirPrefix));
intermediateDoneDirFc = FileContext.getFileContext(
intermediateDoneDirPath.toUri(), conf);
mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
} catch (IOException e) {
LOG.info("error creating done directory on dfs " + e);
throw new YarnException("Error creating intermediate done directory: ["
+ intermediateDoneDirPath + "]", e);
}
this.aclsMgr = new JobACLsManager(conf);
jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE);
dateStringCacheSize = conf.getInt(
JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE);
super.init(conf);
}
private void mkdir(FileContext fc, Path path, FsPermission fsp)
throws IOException {
if (!fc.util().exists(path)) {
try {
fc.mkdir(path, fsp, true);
FileStatus fsStatus = fc.getFileStatus(path);
LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
+ ", Expected: " + fsp.toShort());
if (fsStatus.getPermission().toShort() != fsp.toShort()) {
LOG.info("Explicitly setting permissions to : " + fsp.toShort()
+ ", " + fsp);
fc.setPermission(path, fsp);
}
} catch (FileAlreadyExistsException e) {
LOG.info("Directory: [" + path + "] already exists.");
}
}
}
/**
* Populates index data structures. Should only be called at initialization
* times.
*/
@SuppressWarnings("unchecked")
void initExisting() throws IOException {
LOG.info("Initializing Existing Jobs...");
List<FileStatus> timestampedDirList = findTimestampedDirectories();
Collections.sort(timestampedDirList);
for (FileStatus fs : timestampedDirList) {
// TODO Could verify the correct format for these directories.
addDirectoryToSerialNumberIndex(fs.getPath());
addDirectoryToJobListCache(fs.getPath());
}
}
private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
String serialPart = serialDirPath.getName();
String timeStampPart = JobHistoryUtils
.getTimestampPartFromPath(serialDirPath.toString());
if (timeStampPart == null) {
LOG.warn("Could not find timestamp portion from path: "
+ serialDirPath.toString() + ". Continuing with next");
return;
}
if (serialPart == null) {
LOG.warn("Could not find serial portion from path: "
+ serialDirPath.toString() + ". Continuing with next");
return;
}
synchronized (idToDateString) {
// TODO make this thread safe without the synchronize
if (idToDateString.containsKey(serialPart)) {
Set<String> set = idToDateString.get(serialPart);
set.remove(timeStampPart);
if (set.isEmpty()) {
idToDateString.remove(serialPart);
}
}
}
}
private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + serialDirPath + " to serial index");
}
String serialPart = serialDirPath.getName();
String timestampPart = JobHistoryUtils
.getTimestampPartFromPath(serialDirPath.toString());
if (timestampPart == null) {
LOG.warn("Could not find timestamp portion from path: " + serialDirPath
+ ". Continuing with next");
return;
}
if (serialPart == null) {
LOG.warn("Could not find serial portion from path: "
+ serialDirPath.toString() + ". Continuing with next");
}
addToSerialNumberIndex(serialPart, timestampPart);
}
private void addToSerialNumberIndex(String serialPart, String timestampPart) {
synchronized (idToDateString) {
// TODO make this thread safe without the synchronize
if (!idToDateString.containsKey(serialPart)) {
idToDateString.put(serialPart, new HashSet<String>());
if (idToDateString.size() > dateStringCacheSize) {
idToDateString.remove(idToDateString.firstKey());
}
Set<String> datePartSet = idToDateString.get(serialPart);
datePartSet.add(timestampPart);
}
}
}
private void addDirectoryToJobListCache(Path path) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + path + " to job list cache.");
}
List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
doneDirFc);
for (FileStatus fs : historyFileList) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding in history for " + fs.getPath());
}
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
.getName());
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
.getParent(), confFileName), new Path(fs.getPath().getParent(),
summaryFileName), jobIndexInfo);
addToJobListCache(metaInfo);
}
}
private static List<FileStatus> scanDirectory(Path path, FileContext fc,
PathFilter pathFilter) throws IOException {
path = fc.makeQualified(path);
List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
while (fileStatusIter.hasNext()) {
FileStatus fileStatus = fileStatusIter.next();
Path filePath = fileStatus.getPath();
if (fileStatus.isFile() && pathFilter.accept(filePath)) {
jhStatusList.add(fileStatus);
}
}
return jhStatusList;
}
private static List<FileStatus> scanDirectoryForHistoryFiles(Path path,
FileContext fc) throws IOException {
return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
}
/**
* Finds all history directories with a timestamp component by scanning the
* filesystem. Used when the JobHistory server is started.
*
* @return
*/
private List<FileStatus> findTimestampedDirectories() throws IOException {
List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc,
doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
return fsList;
}
private void addToJobListCache(MetaInfo metaInfo) {
JobId jobId = metaInfo.getJobIndexInfo().getJobId();
if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + jobId + " to job list cache with "
+ metaInfo.getJobIndexInfo());
}
jobListCache.put(jobId, metaInfo);
if (jobListCache.size() > jobListCacheSize) {
jobListCache.remove(jobListCache.firstKey());
}
}
/**
* Scans the intermediate directory to find user directories. Scans these for
* history files if the modification time for the directory has changed.
*
* @throws IOException
*/
private void scanIntermediateDirectory() throws IOException {
List<FileStatus> userDirList = JobHistoryUtils.localGlobber(
intermediateDoneDirFc, intermediateDoneDirPath, "");
for (FileStatus userDir : userDirList) {
String name = userDir.getPath().getName();
long newModificationTime = userDir.getModificationTime();
boolean shouldScan = false;
synchronized (userDirModificationTimeMap) {
if (!userDirModificationTimeMap.containsKey(name)
|| newModificationTime > userDirModificationTimeMap.get(name)) {
shouldScan = true;
userDirModificationTimeMap.put(name, newModificationTime);
}
}
if (shouldScan) {
scanIntermediateDirectory(userDir.getPath());
}
}
}
/**
* Scans the specified path and populates the intermediate cache.
*
* @param absPath
* @throws IOException
*/
private void scanIntermediateDirectory(final Path absPath) throws IOException {
List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath,
intermediateDoneDirFc);
for (FileStatus fs : fileStatusList) {
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
.getName());
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
.getParent(), confFileName), new Path(fs.getPath().getParent(),
summaryFileName), jobIndexInfo);
if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
}
}
}
/**
* Searches the job history file FileStatus list for the specified JobId.
*
* @param fileStatusList
* fileStatus list of Job History Files.
* @param jobId
* The JobId to find.
* @return A MetaInfo object for the jobId, null if not found.
* @throws IOException
*/
private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId)
throws IOException {
for (FileStatus fs : fileStatusList) {
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
.getName());
if (jobIndexInfo.getJobId().equals(jobId)) {
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
.getParent(), confFileName), new Path(fs.getPath().getParent(),
summaryFileName), jobIndexInfo);
return metaInfo;
}
}
return null;
}
/**
* Scans old directories known by the idToDateString map for the specified
* jobId. If the number of directories is higher than the supported size of
* the idToDateString cache, the jobId will not be found.
*
* @param jobId
* the jobId.
* @return
* @throws IOException
*/
private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException {
int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId);
String boxedSerialNumber = String.valueOf(jobSerialNumber);
Set<String> dateStringSet;
synchronized (idToDateString) {
Set<String> found = idToDateString.get(boxedSerialNumber);
if (found == null) {
return null;
} else {
dateStringSet = new HashSet<String>(found);
}
}
for (String timestampPart : dateStringSet) {
Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
doneDirFc);
MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId);
if (metaInfo != null) {
return metaInfo;
}
}
return null;
}
/**
* Checks for the existence of the job history file in the intermediate
* directory.
*
* @param jobId
* @return
* @throws IOException
*/
private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException {
scanIntermediateDirectory();
return intermediateListCache.get(jobId);
}
/**
* Parse a job from the JobHistoryFile, if the underlying file is not going to
* be deleted.
*
* @param metaInfo
* the where the JobHistory is stored.
* @return the Job or null if the underlying file was deleted.
* @throws IOException
* if there is an error trying to read the file.
*/
public Job loadJob(MetaInfo metaInfo) throws IOException {
return new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(),
metaInfo.getHistoryFile(), false, metaInfo.getJobIndexInfo().getUser(),
metaInfo.getConfFile(), aclsMgr);
}
public Collection<MetaInfo> getAllMetaInfo() throws IOException {
scanIntermediateDirectory();
ArrayList<MetaInfo> result = new ArrayList<MetaInfo>();
result.addAll(intermediateListCache.values());
result.addAll(jobListCache.values());
return result;
}
Collection<MetaInfo> getIntermediateMetaInfos() throws IOException {
scanIntermediateDirectory();
return intermediateListCache.values();
}
public MetaInfo getMetaInfo(JobId jobId) throws IOException {
// MetaInfo available in cache.
MetaInfo metaInfo = null;
if (jobListCache.containsKey(jobId)) {
metaInfo = jobListCache.get(jobId);
}
if (metaInfo != null) {
return metaInfo;
}
// MetaInfo not available. Check intermediate directory for meta info.
metaInfo = scanIntermediateForJob(jobId);
if (metaInfo != null) {
return metaInfo;
}
// Intermediate directory does not contain job. Search through older ones.
metaInfo = scanOldDirsForJob(jobId);
if (metaInfo != null) {
return metaInfo;
}
return null;
}
void moveToDone(MetaInfo metaInfo) throws IOException {
long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
if (completeTime == 0)
completeTime = System.currentTimeMillis();
JobId jobId = metaInfo.getJobIndexInfo().getJobId();
List<Path> paths = new ArrayList<Path>();
Path historyFile = metaInfo.getHistoryFile();
if (historyFile == null) {
LOG.info("No file for job-history with " + jobId + " found in cache!");
} else {
paths.add(historyFile);
}
Path confFile = metaInfo.getConfFile();
if (confFile == null) {
LOG.info("No file for jobConf with " + jobId + " found in cache!");
} else {
paths.add(confFile);
}
// TODO Check all mi getters and setters for the conf path
Path summaryFile = metaInfo.getSummaryFile();
if (summaryFile == null) {
LOG.info("No summary file for job: " + jobId);
} else {
try {
String jobSummaryString = getJobSummary(intermediateDoneDirFc,
summaryFile);
SUMMARY_LOG.info(jobSummaryString);
LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
intermediateDoneDirFc.delete(summaryFile, false);
metaInfo.setSummaryFile(null);
} catch (IOException e) {
LOG.warn("Failed to process summary file: [" + summaryFile + "]");
throw e;
}
}
Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
addDirectoryToSerialNumberIndex(targetDir);
try {
makeDoneSubdir(targetDir);
} catch (IOException e) {
LOG.warn("Failed creating subdirectory: " + targetDir
+ " while attempting to move files for jobId: " + jobId);
throw e;
}
synchronized (metaInfo) {
if (historyFile != null) {
Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
.getName()));
try {
moveToDoneNow(historyFile, toPath);
} catch (IOException e) {
LOG.warn("Failed to move file: " + historyFile + " for jobId: "
+ jobId);
throw e;
}
metaInfo.setHistoryFile(toPath);
}
if (confFile != null) {
Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
.getName()));
try {
moveToDoneNow(confFile, toPath);
} catch (IOException e) {
LOG.warn("Failed to move file: " + historyFile + " for jobId: "
+ jobId);
throw e;
}
metaInfo.setConfFile(toPath);
}
}
addToJobListCache(metaInfo);
intermediateListCache.remove(jobId);
}
private void moveToDoneNow(final Path src, final Path target)
throws IOException {
LOG.info("Moving " + src.toString() + " to " + target.toString());
intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
}
private String getJobSummary(FileContext fc, Path path) throws IOException {
Path qPath = fc.makeQualified(path);
FSDataInputStream in = fc.open(qPath);
String jobSummaryString = in.readUTF();
in.close();
return jobSummaryString;
}
private void makeDoneSubdir(Path path) throws IOException {
boolean existsInExistingCache = false;
synchronized (existingDoneSubdirs) {
if (existingDoneSubdirs.contains(path))
existsInExistingCache = true;
}
try {
doneDirFc.getFileStatus(path);
if (!existsInExistingCache) {
existingDoneSubdirs.add(path);
if (LOG.isDebugEnabled()) {
LOG.debug("JobHistory.maybeMakeSubdirectory -- We believed " + path
+ " already existed, but it didn't.");
}
}
} catch (FileNotFoundException fnfE) {
try {
FsPermission fsp = new FsPermission(
JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
doneDirFc.mkdir(path, fsp, true);
FileStatus fsStatus = doneDirFc.getFileStatus(path);
LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
+ ", Expected: " + fsp.toShort());
if (fsStatus.getPermission().toShort() != fsp.toShort()) {
LOG.info("Explicitly setting permissions to : " + fsp.toShort()
+ ", " + fsp);
doneDirFc.setPermission(path, fsp);
}
synchronized (existingDoneSubdirs) {
existingDoneSubdirs.add(path);
}
} catch (FileAlreadyExistsException faeE) {
// Nothing to do.
}
}
}
private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
id, timestampComponent, serialNumberFormat));
}
private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
String timestampComponent = JobHistoryUtils.timestampDirectoryComponent(
millisecondTime, debugMode);
return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
id, timestampComponent, serialNumberFormat));
}
private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) {
if (finishTime == 0) {
return fileStatus.getModificationTime();
}
return finishTime;
}
private void deleteJobFromDone(MetaInfo metaInfo) throws IOException {
jobListCache.remove(metaInfo.getJobId());
doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getHistoryFile()), false);
doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getConfFile()), false);
}
@SuppressWarnings("unchecked")
void clean(long cutoff, HistoryStorage storage) throws IOException {
// TODO this should be replaced by something that knows about the directory
// structure and will put less of a load on HDFS.
boolean halted = false;
// TODO Delete YYYY/MM/DD directories.
List<FileStatus> serialDirList = findTimestampedDirectories();
// Sort in ascending order. Relies on YYYY/MM/DD/Serial
Collections.sort(serialDirList);
for (FileStatus serialDir : serialDirList) {
List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(
serialDir.getPath(), doneDirFc);
for (FileStatus historyFile : historyFileList) {
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile
.getPath().getName());
long effectiveTimestamp = getEffectiveTimestamp(
jobIndexInfo.getFinishTime(), historyFile);
if (effectiveTimestamp <= cutoff) {
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
MetaInfo metaInfo = new MetaInfo(historyFile.getPath(), new Path(
historyFile.getPath().getParent(), confFileName), null,
jobIndexInfo);
storage.jobRemovedFromHDFS(metaInfo.getJobId());
deleteJobFromDone(metaInfo);
} else {
halted = true;
break;
}
}
if (!halted) {
doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
removeDirectoryFromSerialNumberIndex(serialDir.getPath());
synchronized (existingDoneSubdirs) {
existingDoneSubdirs.remove(serialDir.getPath());
}
} else {
break; // Don't scan any more directories.
}
}
}
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import java.util.Map;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Provides an API to query jobs that have finished.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface HistoryStorage {
/**
* Give the Storage a reference to a class that can be used to interact with
* history files.
* @param hsManager the class that is used to interact with history files.
*/
void setHistoryFileManager(HistoryFileManager hsManager);
/**
* Look for a set of partial jobs.
* @param offset the offset into the list of jobs.
* @param count the maximum number of jobs to return.
* @param user only return jobs for the given user.
* @param queue only return jobs for in the given queue.
* @param sBegin only return Jobs that started on or after the given time.
* @param sEnd only return Jobs that started on or before the given time.
* @param fBegin only return Jobs that ended on or after the given time.
* @param fEnd only return Jobs that ended on or before the given time.
* @param jobState only return Jobs that are in the given job state.
* @return The list of filtered jobs.
*/
JobsInfo getPartialJobs(Long offset, Long count, String user,
String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
JobState jobState);
/**
* Get all of the cached jobs. This only returns partial jobs and is here for
* legacy reasons.
* @return all of the cached jobs
*/
Map<JobId, Job> getAllPartialJobs();
/**
* Get a fully parsed job.
* @param jobId the id of the job
* @return the job, or null if it is not found.
*/
Job getFullJob(JobId jobId);
/**
* Informs the Storage that a job has been removed from HDFS
* @param jobId the ID of the job that was removed.
*/
void jobRemovedFromHDFS(JobId jobId);
}

View File

@ -51,6 +51,7 @@ public PartialJob(JobIndexInfo jobIndexInfo, JobId jobId) {
jobReport = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobReport.class);
jobReport.setStartTime(jobIndexInfo.getSubmitTime());
jobReport.setFinishTime(jobIndexInfo.getFinishTime());
jobReport.setJobState(getState());
}
@Override

View File

@ -44,6 +44,7 @@ public void setup() {
bind(JAXBContextResolver.class);
bind(GenericExceptionHandler.class);
bind(AppContext.class).toInstance(history);
bind(HistoryContext.class).toInstance(history);
route("/", HsController.class);
route("/app", HsController.class);
route(pajoin("/job", JOB_ID), HsController.class, "job");

View File

@ -32,10 +32,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@ -49,6 +47,7 @@
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptsInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TasksInfo;
import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptsInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.HistoryInfo;
@ -64,7 +63,7 @@
@Path("/ws/v1/history")
public class HsWebServices {
private final AppContext appCtx;
private final HistoryContext ctx;
private WebApp webapp;
private final Configuration conf;
@ -72,9 +71,9 @@ public class HsWebServices {
UriInfo uriInfo;
@Inject
public HsWebServices(final AppContext appCtx, final Configuration conf,
public HsWebServices(final HistoryContext ctx, final Configuration conf,
final WebApp webapp) {
this.appCtx = appCtx;
this.ctx = ctx;
this.conf = conf;
this.webapp = webapp;
}
@ -103,33 +102,22 @@ public JobsInfo getJobs(@QueryParam("user") String userQuery,
@QueryParam("startedTimeEnd") String startedEnd,
@QueryParam("finishedTimeBegin") String finishBegin,
@QueryParam("finishedTimeEnd") String finishEnd) {
JobsInfo allJobs = new JobsInfo();
long num = 0;
boolean checkCount = false;
boolean checkStart = false;
boolean checkEnd = false;
long countNum = 0;
// set values suitable in case both of begin/end not specified
long sBegin = 0;
long sEnd = Long.MAX_VALUE;
long fBegin = 0;
long fEnd = Long.MAX_VALUE;
Long countParam = null;
if (count != null && !count.isEmpty()) {
checkCount = true;
try {
countNum = Long.parseLong(count);
countParam = Long.parseLong(count);
} catch (NumberFormatException e) {
throw new BadRequestException(e.getMessage());
}
if (countNum <= 0) {
if (countParam <= 0) {
throw new BadRequestException("limit value must be greater then 0");
}
}
Long sBegin = null;
if (startedBegin != null && !startedBegin.isEmpty()) {
checkStart = true;
try {
sBegin = Long.parseLong(startedBegin);
} catch (NumberFormatException e) {
@ -139,8 +127,9 @@ public JobsInfo getJobs(@QueryParam("user") String userQuery,
throw new BadRequestException("startedTimeBegin must be greater than 0");
}
}
Long sEnd = null;
if (startedEnd != null && !startedEnd.isEmpty()) {
checkStart = true;
try {
sEnd = Long.parseLong(startedEnd);
} catch (NumberFormatException e) {
@ -150,13 +139,13 @@ public JobsInfo getJobs(@QueryParam("user") String userQuery,
throw new BadRequestException("startedTimeEnd must be greater than 0");
}
}
if (sBegin > sEnd) {
if (sBegin != null && sEnd != null && sBegin > sEnd) {
throw new BadRequestException(
"startedTimeEnd must be greater than startTimeBegin");
}
Long fBegin = null;
if (finishBegin != null && !finishBegin.isEmpty()) {
checkEnd = true;
try {
fBegin = Long.parseLong(finishBegin);
} catch (NumberFormatException e) {
@ -166,8 +155,8 @@ public JobsInfo getJobs(@QueryParam("user") String userQuery,
throw new BadRequestException("finishedTimeBegin must be greater than 0");
}
}
Long fEnd = null;
if (finishEnd != null && !finishEnd.isEmpty()) {
checkEnd = true;
try {
fEnd = Long.parseLong(finishEnd);
} catch (NumberFormatException e) {
@ -177,53 +166,18 @@ public JobsInfo getJobs(@QueryParam("user") String userQuery,
throw new BadRequestException("finishedTimeEnd must be greater than 0");
}
}
if (fBegin > fEnd) {
if (fBegin != null && fEnd != null && fBegin > fEnd) {
throw new BadRequestException(
"finishedTimeEnd must be greater than finishedTimeBegin");
}
for (Job job : appCtx.getAllJobs().values()) {
if (checkCount && num == countNum) {
break;
JobState jobState = null;
if (stateQuery != null) {
jobState = JobState.valueOf(stateQuery);
}
if (stateQuery != null && !stateQuery.isEmpty()) {
JobState.valueOf(stateQuery);
if (!job.getState().toString().equalsIgnoreCase(stateQuery)) {
continue;
}
}
// can't really validate queue is a valid one since queues could change
if (queueQuery != null && !queueQuery.isEmpty()) {
if (!job.getQueueName().equals(queueQuery)) {
continue;
}
}
if (userQuery != null && !userQuery.isEmpty()) {
if (!job.getUserName().equals(userQuery)) {
continue;
}
}
JobReport report = job.getReport();
if (checkStart
&& (report.getStartTime() < sBegin || report.getStartTime() > sEnd)) {
continue;
}
if (checkEnd
&& (report.getFinishTime() < fBegin || report.getFinishTime() > fEnd)) {
continue;
}
JobInfo jobInfo = new JobInfo(job);
allJobs.add(jobInfo);
num++;
}
return allJobs;
return ctx.getPartialJobs(0l, countParam, userQuery, queueQuery,
sBegin, sEnd, fBegin, fEnd, jobState);
}
@GET
@ -231,7 +185,7 @@ public JobsInfo getJobs(@QueryParam("user") String userQuery,
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public JobInfo getJob(@PathParam("jobid") String jid) {
Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
return new JobInfo(job);
}
@ -240,7 +194,7 @@ public JobInfo getJob(@PathParam("jobid") String jid) {
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public AMAttemptsInfo getJobAttempts(@PathParam("jobid") String jid) {
Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
AMAttemptsInfo amAttempts = new AMAttemptsInfo();
for (AMInfo amInfo : job.getAMInfos()) {
AMAttemptInfo attempt = new AMAttemptInfo(amInfo, MRApps.toString(job
@ -256,8 +210,8 @@ public AMAttemptsInfo getJobAttempts(@PathParam("jobid") String jid) {
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public JobCounterInfo getJobCounters(@PathParam("jobid") String jid) {
Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
return new JobCounterInfo(this.appCtx, job);
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
return new JobCounterInfo(this.ctx, job);
}
@GET
@ -265,7 +219,7 @@ public JobCounterInfo getJobCounters(@PathParam("jobid") String jid) {
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public ConfInfo getJobConf(@PathParam("jobid") String jid) {
Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
ConfInfo info;
try {
info = new ConfInfo(job, this.conf);
@ -282,7 +236,7 @@ public ConfInfo getJobConf(@PathParam("jobid") String jid) {
public TasksInfo getJobTasks(@PathParam("jobid") String jid,
@QueryParam("type") String type) {
Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
TasksInfo allTasks = new TasksInfo();
for (Task task : job.getTasks().values()) {
TaskType ttype = null;
@ -307,7 +261,7 @@ public TasksInfo getJobTasks(@PathParam("jobid") String jid,
public TaskInfo getJobTask(@PathParam("jobid") String jid,
@PathParam("taskid") String tid) {
Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
return new TaskInfo(task);
@ -319,7 +273,7 @@ public TaskInfo getJobTask(@PathParam("jobid") String jid,
public JobTaskCounterInfo getSingleTaskCounters(
@PathParam("jobid") String jid, @PathParam("taskid") String tid) {
Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
TaskId taskID = MRApps.toTaskID(tid);
if (taskID == null) {
throw new NotFoundException("taskid " + tid + " not found or invalid");
@ -338,7 +292,7 @@ public TaskAttemptsInfo getJobTaskAttempts(@PathParam("jobid") String jid,
@PathParam("taskid") String tid) {
TaskAttemptsInfo attempts = new TaskAttemptsInfo();
Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
for (TaskAttempt ta : task.getAttempts().values()) {
if (ta != null) {
@ -358,7 +312,7 @@ public TaskAttemptsInfo getJobTaskAttempts(@PathParam("jobid") String jid,
public TaskAttemptInfo getJobTaskAttemptId(@PathParam("jobid") String jid,
@PathParam("taskid") String tid, @PathParam("attemptid") String attId) {
Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
TaskAttempt ta = AMWebServices.getTaskAttemptFromTaskAttemptString(attId,
task);
@ -376,7 +330,7 @@ public JobTaskAttemptCounterInfo getJobTaskAttemptIdCounters(
@PathParam("jobid") String jid, @PathParam("taskid") String tid,
@PathParam("attemptid") String attId) {
Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
TaskAttempt ta = AMWebServices.getTaskAttemptFromTaskAttemptString(attId,
task);

View File

@ -92,6 +92,14 @@ public void testHistoryParsingWithParseErrors() throws Exception {
checkHistoryParsing(3, 0, 2);
}
private static String getJobSummary(FileContext fc, Path path) throws IOException {
Path qPath = fc.makeQualified(path);
FSDataInputStream in = fc.open(qPath);
String jobSummaryString = in.readUTF();
in.close();
return jobSummaryString;
}
private void checkHistoryParsing(final int numMaps, final int numReduces,
final int numSuccessfulMaps)
throws Exception {
@ -244,7 +252,7 @@ public HistoryEvent answer(InvocationOnMock invocation)
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobId);
Path summaryFile = new Path(jobhistoryDir, summaryFileName);
String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile);
String jobSummaryString = getJobSummary(fc, summaryFile);
Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
Assert.assertNotNull(jobSummaryString);

View File

@ -30,11 +30,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
@ -77,7 +79,7 @@ public class TestHsWebServices extends JerseyTest {
private static TestAppContext appContext;
private static HsWebApp webApp;
static class TestAppContext implements AppContext {
static class TestAppContext implements HistoryContext {
final ApplicationAttemptId appAttemptID;
final ApplicationId appID;
final String user = MockJobs.newUserName();
@ -144,6 +146,20 @@ public long getStartTime() {
public ClusterInfo getClusterInfo() {
return null;
}
@Override
public Map<JobId, Job> getAllJobs(ApplicationId appID) {
// TODO Auto-generated method stub
return null;
}
@Override
public JobsInfo getPartialJobs(Long offset, Long count, String user,
String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
JobState jobState) {
// TODO Auto-generated method stub
return null;
}
}
private Injector injector = Guice.createInjector(new ServletModule() {
@ -160,6 +176,7 @@ protected void configureServlets() {
bind(GenericExceptionHandler.class);
bind(WebApp.class).toInstance(webApp);
bind(AppContext.class).toInstance(appContext);
bind(HistoryContext.class).toInstance(appContext);
bind(Configuration.class).toInstance(conf);
serve("/*").with(GuiceContainer.class);

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
@ -42,6 +43,8 @@
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
@ -89,7 +92,7 @@ public class TestHsWebServicesAttempts extends JerseyTest {
private static TestAppContext appContext;
private static HsWebApp webApp;
static class TestAppContext implements AppContext {
static class TestAppContext implements HistoryContext {
final ApplicationAttemptId appAttemptID;
final ApplicationId appID;
final String user = MockJobs.newUserName();
@ -156,6 +159,20 @@ public long getStartTime() {
public ClusterInfo getClusterInfo() {
return null;
}
@Override
public Map<JobId, Job> getAllJobs(ApplicationId appID) {
// TODO Auto-generated method stub
return null;
}
@Override
public JobsInfo getPartialJobs(Long offset, Long count, String user,
String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
JobState jobState) {
// TODO Auto-generated method stub
return null;
}
}
private Injector injector = Guice.createInjector(new ServletModule() {
@ -171,6 +188,7 @@ protected void configureServlets() {
bind(GenericExceptionHandler.class);
bind(WebApp.class).toInstance(webApp);
bind(AppContext.class).toInstance(appContext);
bind(HistoryContext.class).toInstance(appContext);
bind(Configuration.class).toInstance(conf);
serve("/*").with(GuiceContainer.class);

View File

@ -41,9 +41,12 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
@ -90,7 +93,7 @@ public class TestHsWebServicesJobConf extends JerseyTest {
private static File testConfDir = new File("target",
TestHsWebServicesJobConf.class.getSimpleName() + "confDir");
static class TestAppContext implements AppContext {
static class TestAppContext implements HistoryContext {
final ApplicationAttemptId appAttemptID;
final ApplicationId appID;
final String user = MockJobs.newUserName();
@ -156,6 +159,20 @@ public long getStartTime() {
public ClusterInfo getClusterInfo() {
return null;
}
@Override
public Map<JobId, Job> getAllJobs(ApplicationId appID) {
// TODO Auto-generated method stub
return null;
}
@Override
public JobsInfo getPartialJobs(Long offset, Long count, String user,
String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
JobState jobState) {
// TODO Auto-generated method stub
return null;
}
}
private Injector injector = Guice.createInjector(new ServletModule() {
@ -195,6 +212,7 @@ protected void configureServlets() {
bind(GenericExceptionHandler.class);
bind(WebApp.class).toInstance(webApp);
bind(AppContext.class).toInstance(appContext);
bind(HistoryContext.class).toInstance(appContext);
bind(Configuration.class).toInstance(conf);
serve("/*").with(GuiceContainer.class);

View File

@ -38,11 +38,15 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage;
import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs;
import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs.JobsPair;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
@ -90,7 +94,7 @@ public class TestHsWebServicesJobs extends JerseyTest {
private static TestAppContext appContext;
private static HsWebApp webApp;
static class TestAppContext implements AppContext {
static class TestAppContext implements HistoryContext {
final ApplicationAttemptId appAttemptID;
final ApplicationId appID;
final String user = MockJobs.newUserName();
@ -169,6 +173,20 @@ public long getStartTime() {
public ClusterInfo getClusterInfo() {
return null;
}
@Override
public Map<JobId, Job> getAllJobs(ApplicationId appID) {
// TODO Auto-generated method stub
return null;
}
@Override
public JobsInfo getPartialJobs(Long offset, Long count, String user,
String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
JobState jobState) {
return CachedHistoryStorage.getPartialJobs(this.partialJobs.values(),
offset, count, user, queue, sBegin, sEnd, fBegin, fEnd, jobState);
}
}
private Injector injector = Guice.createInjector(new ServletModule() {
@ -184,6 +202,7 @@ protected void configureServlets() {
bind(GenericExceptionHandler.class);
bind(WebApp.class).toInstance(webApp);
bind(AppContext.class).toInstance(appContext);
bind(HistoryContext.class).toInstance(appContext);
bind(Configuration.class).toInstance(conf);
serve("/*").with(GuiceContainer.class);

View File

@ -36,8 +36,11 @@
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage;
import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs;
import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs.JobsPair;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
@ -77,7 +80,7 @@ public class TestHsWebServicesJobsQuery extends JerseyTest {
private static TestAppContext appContext;
private static HsWebApp webApp;
static class TestAppContext implements AppContext {
static class TestAppContext implements HistoryContext {
final String user = MockJobs.newUserName();
final Map<JobId, Job> fullJobs;
final Map<JobId, Job> partialJobs;
@ -152,6 +155,20 @@ public long getStartTime() {
public ClusterInfo getClusterInfo() {
return null;
}
@Override
public Map<JobId, Job> getAllJobs(ApplicationId appID) {
// TODO Auto-generated method stub
return null;
}
@Override
public JobsInfo getPartialJobs(Long offset, Long count, String user,
String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
JobState jobState) {
return CachedHistoryStorage.getPartialJobs(this.partialJobs.values(),
offset, count, user, queue, sBegin, sEnd, fBegin, fEnd, jobState);
}
}
private Injector injector = Guice.createInjector(new ServletModule() {
@ -167,6 +184,7 @@ protected void configureServlets() {
bind(GenericExceptionHandler.class);
bind(WebApp.class).toInstance(webApp);
bind(AppContext.class).toInstance(appContext);
bind(HistoryContext.class).toInstance(appContext);
bind(Configuration.class).toInstance(conf);
serve("/*").with(GuiceContainer.class);

View File

@ -34,12 +34,15 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
@ -85,7 +88,7 @@ public class TestHsWebServicesTasks extends JerseyTest {
private static TestAppContext appContext;
private static HsWebApp webApp;
static class TestAppContext implements AppContext {
static class TestAppContext implements HistoryContext {
final ApplicationAttemptId appAttemptID;
final ApplicationId appID;
final String user = MockJobs.newUserName();
@ -152,6 +155,20 @@ public long getStartTime() {
public ClusterInfo getClusterInfo() {
return null;
}
@Override
public Map<JobId, Job> getAllJobs(ApplicationId appID) {
// TODO Auto-generated method stub
return null;
}
@Override
public JobsInfo getPartialJobs(Long offset, Long count, String user,
String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
JobState jobState) {
// TODO Auto-generated method stub
return null;
}
}
private Injector injector = Guice.createInjector(new ServletModule() {
@ -167,6 +184,7 @@ protected void configureServlets() {
bind(GenericExceptionHandler.class);
bind(WebApp.class).toInstance(webApp);
bind(AppContext.class).toInstance(appContext);
bind(HistoryContext.class).toInstance(appContext);
bind(Configuration.class).toInstance(conf);
serve("/*").with(GuiceContainer.class);