From ce9bdceac752da408fd9d230b4ed9a170ee6577c Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Tue, 10 Apr 2012 18:13:09 +0000 Subject: [PATCH] merge -r 1311895:1311896 from trunk. FIXES: MAPREDUCE-4059 git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1311897 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/jobhistory/JHAdminConfig.java | 18 + .../v2/jobhistory/JobHistoryUtils.java | 4 + .../mapreduce/v2/hs/CachedHistoryStorage.java | 217 ++++ .../mapreduce/v2/hs/HistoryContext.java | 5 + .../mapreduce/v2/hs/HistoryFileManager.java | 763 ++++++++++++ .../mapreduce/v2/hs/HistoryStorage.java | 80 ++ .../hadoop/mapreduce/v2/hs/JobHistory.java | 1031 +++-------------- .../hadoop/mapreduce/v2/hs/PartialJob.java | 1 + .../mapreduce/v2/hs/webapp/HsWebApp.java | 1 + .../mapreduce/v2/hs/webapp/HsWebServices.java | 112 +- .../v2/hs/TestJobHistoryParsing.java | 10 +- .../v2/hs/webapp/TestHsWebServices.java | 19 +- .../hs/webapp/TestHsWebServicesAttempts.java | 20 +- .../hs/webapp/TestHsWebServicesJobConf.java | 20 +- .../v2/hs/webapp/TestHsWebServicesJobs.java | 21 +- .../hs/webapp/TestHsWebServicesJobsQuery.java | 20 +- .../v2/hs/webapp/TestHsWebServicesTasks.java | 20 +- 18 files changed, 1399 insertions(+), 966 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 16324c52b17..594737a4ca5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -136,6 +136,9 @@ Release 0.23.3 - UNRELEASED IMPROVEMENTS + MAPREDUCE-4059. The history server should have a separate pluggable + storage/query interface. (Robert Evans via tgraves) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java index a89f70c901d..22cc7fcc64e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java @@ -44,6 +44,9 @@ public class JHAdminConfig { /** Run the History Cleaner every X ms.*/ public static final String MR_HISTORY_CLEANER_INTERVAL_MS = MR_HISTORY_PREFIX + "cleaner.interval-ms"; + public static final long DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS = + 1 * 24 * 60 * 60 * 1000l; //1 day + /** The number of threads to handle client API requests.*/ public static final String MR_HISTORY_CLIENT_THREAD_COUNT = @@ -56,7 +59,9 @@ public class JHAdminConfig { */ public static final String MR_HISTORY_DATESTRING_CACHE_SIZE = MR_HISTORY_PREFIX + "datestring.cache.size"; + public static final int DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE = 200000; + //TODO REMOVE debug-mode /** Equivalent to 0.20 mapreduce.jobhistory.debug.mode */ public static final String MR_HISTORY_DEBUG_MODE = MR_HISTORY_PREFIX + "debug-mode"; @@ -75,6 +80,7 @@ public class JHAdminConfig { /** Size of the job list cache.*/ public static final String MR_HISTORY_JOBLIST_CACHE_SIZE = MR_HISTORY_PREFIX + "joblist.cache.size"; + public static final int DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE = 20000; /** The location of the Kerberos keytab file.*/ public static final String MR_HISTORY_KEYTAB = MR_HISTORY_PREFIX + "keytab"; @@ -82,6 +88,7 @@ public class JHAdminConfig { /** Size of the loaded job cache.*/ public static final String MR_HISTORY_LOADED_JOB_CACHE_SIZE = MR_HISTORY_PREFIX + "loadedjobs.cache.size"; + public static final int DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE = 5; /** * The maximum age of a job history file before it is deleted from the history @@ -89,6 +96,8 @@ public class JHAdminConfig { */ public static final String MR_HISTORY_MAX_AGE_MS = MR_HISTORY_PREFIX + "max-age-ms"; + public static final long DEFAULT_MR_HISTORY_MAX_AGE = + 7 * 24 * 60 * 60 * 1000L; //1 week /** * Scan for history files to more from intermediate done dir to done dir @@ -96,10 +105,13 @@ public class JHAdminConfig { */ public static final String MR_HISTORY_MOVE_INTERVAL_MS = MR_HISTORY_PREFIX + "move.interval-ms"; + public static final long DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS = + 3 * 60 * 1000l; //3 minutes /** The number of threads used to move files.*/ public static final String MR_HISTORY_MOVE_THREAD_COUNT = MR_HISTORY_PREFIX + "move.thread-count"; + public static final int DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT = 3; /** The Kerberos principal for the history server.*/ public static final String MR_HISTORY_PRINCIPAL = @@ -116,4 +128,10 @@ public class JHAdminConfig { */ public static final String MR_HS_SECURITY_SERVICE_AUTHORIZATION = "security.mrhs.client.protocol.acl"; + + /** + * The HistoryStorage class to use to cache history data. + */ + public static final String MR_HISTORY_STORAGE = + MR_HISTORY_PREFIX + ".store.class"; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java index d7e191b0eaf..494431614d7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java @@ -31,6 +31,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; @@ -50,6 +52,8 @@ import com.google.common.base.Joiner; import com.google.common.base.Splitter; +@InterfaceAudience.Private +@InterfaceStability.Unstable public class JobHistoryUtils { /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java new file mode 100644 index 00000000000..5a4da68e6fd --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.hs; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobReport; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; +import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.MetaInfo; +import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.service.AbstractService; + +/** + * Manages an in memory cache of parsed Job History files. + */ +public class CachedHistoryStorage extends AbstractService implements + HistoryStorage { + private static final Log LOG = LogFactory.getLog(CachedHistoryStorage.class); + + private Map loadedJobCache = null; + // The number of loaded jobs. + private int loadedJobCacheSize; + + private HistoryFileManager hsManager; + + @Override + public void setHistoryFileManager(HistoryFileManager hsManager) { + this.hsManager = hsManager; + } + + @SuppressWarnings("serial") + @Override + public void init(Configuration conf) throws YarnException { + LOG.info("CachedHistoryStorage Init"); + + loadedJobCacheSize = conf.getInt( + JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, + JHAdminConfig.DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE); + + loadedJobCache = Collections.synchronizedMap(new LinkedHashMap( + loadedJobCacheSize + 1, 0.75f, true) { + @Override + public boolean removeEldestEntry(final Map.Entry eldest) { + return super.size() > loadedJobCacheSize; + } + }); + + super.init(conf); + } + + public CachedHistoryStorage() { + super(CachedHistoryStorage.class.getName()); + } + + private Job loadJob(MetaInfo metaInfo) { + try { + Job job = hsManager.loadJob(metaInfo); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + job.getID() + " to loaded job cache"); + } + loadedJobCache.put(job.getID(), job); + return job; + } catch (IOException e) { + throw new YarnException( + "Could not find/load job: " + metaInfo.getJobId(), e); + } + } + + @Override + public synchronized Job getFullJob(JobId jobId) { + if (LOG.isDebugEnabled()) { + LOG.debug("Looking for Job " + jobId); + } + try { + Job result = loadedJobCache.get(jobId); + if (result == null) { + MetaInfo metaInfo = hsManager.getMetaInfo(jobId); + if (metaInfo != null) { + result = loadJob(metaInfo); + } + } + return result; + } catch (IOException e) { + throw new YarnException(e); + } + } + + @Override + public Map getAllPartialJobs() { + LOG.debug("Called getAllPartialJobs()"); + SortedMap result = new TreeMap(); + try { + for (MetaInfo mi : hsManager.getAllMetaInfo()) { + if (mi != null) { + JobId id = mi.getJobId(); + result.put(id, new PartialJob(mi.getJobIndexInfo(), id)); + } + } + } catch (IOException e) { + LOG.warn("Error trying to scan for all MetaInfos", e); + throw new YarnException(e); + } + return result; + } + + @Override + public void jobRemovedFromHDFS(JobId jobId) { + loadedJobCache.remove(jobId); + } + + @Override + public JobsInfo getPartialJobs(Long offset, Long count, String user, + String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, + JobState jobState) { + return getPartialJobs(getAllPartialJobs().values(), offset, count, user, + queue, sBegin, sEnd, fBegin, fEnd, jobState); + } + + public static JobsInfo getPartialJobs(Collection jobs, Long offset, + Long count, String user, String queue, Long sBegin, Long sEnd, + Long fBegin, Long fEnd, JobState jobState) { + JobsInfo allJobs = new JobsInfo(); + + if (sBegin == null || sBegin < 0) + sBegin = 0l; + if (sEnd == null) + sEnd = Long.MAX_VALUE; + if (fBegin == null || fBegin < 0) + fBegin = 0l; + if (fEnd == null) + fEnd = Long.MAX_VALUE; + if (offset == null || offset < 0) + offset = 0l; + if (count == null) + count = Long.MAX_VALUE; + + if (offset > jobs.size()) { + return allJobs; + } + + long at = 0; + long end = offset + count - 1; + if (end < 0) { // due to overflow + end = Long.MAX_VALUE; + } + for (Job job : jobs) { + if (at > end) { + break; + } + + // can't really validate queue is a valid one since queues could change + if (queue != null && !queue.isEmpty()) { + if (!job.getQueueName().equals(queue)) { + continue; + } + } + + if (user != null && !user.isEmpty()) { + if (!job.getUserName().equals(user)) { + continue; + } + } + + JobReport report = job.getReport(); + + if (report.getStartTime() < sBegin || report.getStartTime() > sEnd) { + continue; + } + if (report.getFinishTime() < fBegin || report.getFinishTime() > fEnd) { + continue; + } + if (jobState != null && jobState != report.getJobState()) { + continue; + } + + at++; + if ((at - 1) < offset) { + continue; + } + + JobInfo jobInfo = new JobInfo(job); + + allJobs.add(jobInfo); + } + return allJobs; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java index 0dfebf85143..881c6c2dbf7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java @@ -24,8 +24,13 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; public interface HistoryContext extends AppContext { Map getAllJobs(ApplicationId appID); + + JobsInfo getPartialJobs(Long offset, Long count, String user, + String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, JobState jobState); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java new file mode 100644 index 00000000000..07b078f50aa --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java @@ -0,0 +1,763 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.hs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentSkipListMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.mapred.JobACLsManager; +import org.apache.hadoop.mapreduce.jobhistory.JobSummary; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; +import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.service.AbstractService; + +/** + * This class provides a way to interact with history files in a thread safe + * manor. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class HistoryFileManager extends AbstractService { + private static final Log LOG = LogFactory.getLog(HistoryFileManager.class); + private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class); + + private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils + .doneSubdirsBeforeSerialTail(); + + public static class MetaInfo { + private Path historyFile; + private Path confFile; + private Path summaryFile; + private JobIndexInfo jobIndexInfo; + + public MetaInfo(Path historyFile, Path confFile, Path summaryFile, + JobIndexInfo jobIndexInfo) { + this.historyFile = historyFile; + this.confFile = confFile; + this.summaryFile = summaryFile; + this.jobIndexInfo = jobIndexInfo; + } + + private Path getHistoryFile() { + return historyFile; + } + + private Path getConfFile() { + return confFile; + } + + private Path getSummaryFile() { + return summaryFile; + } + + public JobIndexInfo getJobIndexInfo() { + return jobIndexInfo; + } + + public JobId getJobId() { + return jobIndexInfo.getJobId(); + } + + private void setHistoryFile(Path historyFile) { + this.historyFile = historyFile; + } + + private void setConfFile(Path confFile) { + this.confFile = confFile; + } + + private void setSummaryFile(Path summaryFile) { + this.summaryFile = summaryFile; + } + } + + /** + * Maps between a serial number (generated based on jobId) and the timestamp + * component(s) to which it belongs. Facilitates jobId based searches. If a + * jobId is not found in this list - it will not be found. + */ + private final SortedMap> idToDateString = + new TreeMap>(); + // The number of entries in idToDateString + private int dateStringCacheSize; + + // Maintains minimal details for recent jobs (parsed from history file name). + // Sorted on Job Completion Time. + private final SortedMap jobListCache = + new ConcurrentSkipListMap(); + // The number of jobs to maintain in the job list cache. + private int jobListCacheSize; + + // Re-use existing MetaInfo objects if they exist for the specific JobId. + // (synchronization on MetaInfo) + // Check for existence of the object when using iterators. + private final SortedMap intermediateListCache = + new ConcurrentSkipListMap(); + + // Maintains a list of known done subdirectories. + private final Set existingDoneSubdirs = new HashSet(); + + /** + * Maintains a mapping between intermediate user directories and the last + * known modification time. + */ + private Map userDirModificationTimeMap = + new HashMap(); + + private JobACLsManager aclsMgr; + + private Configuration conf; + + // TODO Remove me!!!! + private boolean debugMode; + private String serialNumberFormat; + + private Path doneDirPrefixPath = null; // folder for completed jobs + private FileContext doneDirFc; // done Dir FileContext + + private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path + private FileContext intermediateDoneDirFc; // Intermediate Done Dir + // FileContext + + public HistoryFileManager() { + super(HistoryFileManager.class.getName()); + } + + @Override + public void init(Configuration conf) { + this.conf = conf; + + debugMode = conf.getBoolean(JHAdminConfig.MR_HISTORY_DEBUG_MODE, false); + int serialNumberLowDigits = debugMode ? 1 : 3; + serialNumberFormat = ("%0" + + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits) + + "d"); + + String doneDirPrefix = null; + doneDirPrefix = JobHistoryUtils + .getConfiguredHistoryServerDoneDirPrefix(conf); + try { + doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified( + new Path(doneDirPrefix)); + doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf); + mkdir(doneDirFc, doneDirPrefixPath, new FsPermission( + JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION)); + } catch (IOException e) { + throw new YarnException("Error creating done directory: [" + + doneDirPrefixPath + "]", e); + } + + String intermediateDoneDirPrefix = null; + intermediateDoneDirPrefix = JobHistoryUtils + .getConfiguredHistoryIntermediateDoneDirPrefix(conf); + try { + intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified( + new Path(intermediateDoneDirPrefix)); + intermediateDoneDirFc = FileContext.getFileContext( + intermediateDoneDirPath.toUri(), conf); + mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission( + JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort())); + } catch (IOException e) { + LOG.info("error creating done directory on dfs " + e); + throw new YarnException("Error creating intermediate done directory: [" + + intermediateDoneDirPath + "]", e); + } + + this.aclsMgr = new JobACLsManager(conf); + + jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, + JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE); + + dateStringCacheSize = conf.getInt( + JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE, + JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE); + + super.init(conf); + } + + private void mkdir(FileContext fc, Path path, FsPermission fsp) + throws IOException { + if (!fc.util().exists(path)) { + try { + fc.mkdir(path, fsp, true); + + FileStatus fsStatus = fc.getFileStatus(path); + LOG.info("Perms after creating " + fsStatus.getPermission().toShort() + + ", Expected: " + fsp.toShort()); + if (fsStatus.getPermission().toShort() != fsp.toShort()) { + LOG.info("Explicitly setting permissions to : " + fsp.toShort() + + ", " + fsp); + fc.setPermission(path, fsp); + } + } catch (FileAlreadyExistsException e) { + LOG.info("Directory: [" + path + "] already exists."); + } + } + } + + /** + * Populates index data structures. Should only be called at initialization + * times. + */ + @SuppressWarnings("unchecked") + void initExisting() throws IOException { + LOG.info("Initializing Existing Jobs..."); + List timestampedDirList = findTimestampedDirectories(); + Collections.sort(timestampedDirList); + for (FileStatus fs : timestampedDirList) { + // TODO Could verify the correct format for these directories. + addDirectoryToSerialNumberIndex(fs.getPath()); + addDirectoryToJobListCache(fs.getPath()); + } + } + + private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) { + String serialPart = serialDirPath.getName(); + String timeStampPart = JobHistoryUtils + .getTimestampPartFromPath(serialDirPath.toString()); + if (timeStampPart == null) { + LOG.warn("Could not find timestamp portion from path: " + + serialDirPath.toString() + ". Continuing with next"); + return; + } + if (serialPart == null) { + LOG.warn("Could not find serial portion from path: " + + serialDirPath.toString() + ". Continuing with next"); + return; + } + synchronized (idToDateString) { + // TODO make this thread safe without the synchronize + if (idToDateString.containsKey(serialPart)) { + Set set = idToDateString.get(serialPart); + set.remove(timeStampPart); + if (set.isEmpty()) { + idToDateString.remove(serialPart); + } + } + } + } + + private void addDirectoryToSerialNumberIndex(Path serialDirPath) { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + serialDirPath + " to serial index"); + } + String serialPart = serialDirPath.getName(); + String timestampPart = JobHistoryUtils + .getTimestampPartFromPath(serialDirPath.toString()); + if (timestampPart == null) { + LOG.warn("Could not find timestamp portion from path: " + serialDirPath + + ". Continuing with next"); + return; + } + if (serialPart == null) { + LOG.warn("Could not find serial portion from path: " + + serialDirPath.toString() + ". Continuing with next"); + } + addToSerialNumberIndex(serialPart, timestampPart); + } + + private void addToSerialNumberIndex(String serialPart, String timestampPart) { + synchronized (idToDateString) { + // TODO make this thread safe without the synchronize + if (!idToDateString.containsKey(serialPart)) { + idToDateString.put(serialPart, new HashSet()); + if (idToDateString.size() > dateStringCacheSize) { + idToDateString.remove(idToDateString.firstKey()); + } + Set datePartSet = idToDateString.get(serialPart); + datePartSet.add(timestampPart); + } + } + } + + private void addDirectoryToJobListCache(Path path) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + path + " to job list cache."); + } + List historyFileList = scanDirectoryForHistoryFiles(path, + doneDirFc); + for (FileStatus fs : historyFileList) { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding in history for " + fs.getPath()); + } + JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() + .getName()); + String confFileName = JobHistoryUtils + .getIntermediateConfFileName(jobIndexInfo.getJobId()); + String summaryFileName = JobHistoryUtils + .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); + MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath() + .getParent(), confFileName), new Path(fs.getPath().getParent(), + summaryFileName), jobIndexInfo); + addToJobListCache(metaInfo); + } + } + + private static List scanDirectory(Path path, FileContext fc, + PathFilter pathFilter) throws IOException { + path = fc.makeQualified(path); + List jhStatusList = new ArrayList(); + RemoteIterator fileStatusIter = fc.listStatus(path); + while (fileStatusIter.hasNext()) { + FileStatus fileStatus = fileStatusIter.next(); + Path filePath = fileStatus.getPath(); + if (fileStatus.isFile() && pathFilter.accept(filePath)) { + jhStatusList.add(fileStatus); + } + } + return jhStatusList; + } + + private static List scanDirectoryForHistoryFiles(Path path, + FileContext fc) throws IOException { + return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter()); + } + + /** + * Finds all history directories with a timestamp component by scanning the + * filesystem. Used when the JobHistory server is started. + * + * @return + */ + private List findTimestampedDirectories() throws IOException { + List fsList = JobHistoryUtils.localGlobber(doneDirFc, + doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL); + return fsList; + } + + private void addToJobListCache(MetaInfo metaInfo) { + JobId jobId = metaInfo.getJobIndexInfo().getJobId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + jobId + " to job list cache with " + + metaInfo.getJobIndexInfo()); + } + jobListCache.put(jobId, metaInfo); + if (jobListCache.size() > jobListCacheSize) { + jobListCache.remove(jobListCache.firstKey()); + } + } + + /** + * Scans the intermediate directory to find user directories. Scans these for + * history files if the modification time for the directory has changed. + * + * @throws IOException + */ + private void scanIntermediateDirectory() throws IOException { + List userDirList = JobHistoryUtils.localGlobber( + intermediateDoneDirFc, intermediateDoneDirPath, ""); + + for (FileStatus userDir : userDirList) { + String name = userDir.getPath().getName(); + long newModificationTime = userDir.getModificationTime(); + boolean shouldScan = false; + synchronized (userDirModificationTimeMap) { + if (!userDirModificationTimeMap.containsKey(name) + || newModificationTime > userDirModificationTimeMap.get(name)) { + shouldScan = true; + userDirModificationTimeMap.put(name, newModificationTime); + } + } + if (shouldScan) { + scanIntermediateDirectory(userDir.getPath()); + } + } + } + + /** + * Scans the specified path and populates the intermediate cache. + * + * @param absPath + * @throws IOException + */ + private void scanIntermediateDirectory(final Path absPath) throws IOException { + List fileStatusList = scanDirectoryForHistoryFiles(absPath, + intermediateDoneDirFc); + for (FileStatus fs : fileStatusList) { + JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() + .getName()); + String confFileName = JobHistoryUtils + .getIntermediateConfFileName(jobIndexInfo.getJobId()); + String summaryFileName = JobHistoryUtils + .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); + MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath() + .getParent(), confFileName), new Path(fs.getPath().getParent(), + summaryFileName), jobIndexInfo); + if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) { + intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo); + } + } + } + + /** + * Searches the job history file FileStatus list for the specified JobId. + * + * @param fileStatusList + * fileStatus list of Job History Files. + * @param jobId + * The JobId to find. + * @return A MetaInfo object for the jobId, null if not found. + * @throws IOException + */ + private MetaInfo getJobMetaInfo(List fileStatusList, JobId jobId) + throws IOException { + for (FileStatus fs : fileStatusList) { + JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() + .getName()); + if (jobIndexInfo.getJobId().equals(jobId)) { + String confFileName = JobHistoryUtils + .getIntermediateConfFileName(jobIndexInfo.getJobId()); + String summaryFileName = JobHistoryUtils + .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); + MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath() + .getParent(), confFileName), new Path(fs.getPath().getParent(), + summaryFileName), jobIndexInfo); + return metaInfo; + } + } + return null; + } + + /** + * Scans old directories known by the idToDateString map for the specified + * jobId. If the number of directories is higher than the supported size of + * the idToDateString cache, the jobId will not be found. + * + * @param jobId + * the jobId. + * @return + * @throws IOException + */ + private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException { + int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId); + String boxedSerialNumber = String.valueOf(jobSerialNumber); + Set dateStringSet; + synchronized (idToDateString) { + Set found = idToDateString.get(boxedSerialNumber); + if (found == null) { + return null; + } else { + dateStringSet = new HashSet(found); + } + } + for (String timestampPart : dateStringSet) { + Path logDir = canonicalHistoryLogPath(jobId, timestampPart); + List fileStatusList = scanDirectoryForHistoryFiles(logDir, + doneDirFc); + MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId); + if (metaInfo != null) { + return metaInfo; + } + } + return null; + } + + /** + * Checks for the existence of the job history file in the intermediate + * directory. + * + * @param jobId + * @return + * @throws IOException + */ + private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException { + scanIntermediateDirectory(); + return intermediateListCache.get(jobId); + } + + /** + * Parse a job from the JobHistoryFile, if the underlying file is not going to + * be deleted. + * + * @param metaInfo + * the where the JobHistory is stored. + * @return the Job or null if the underlying file was deleted. + * @throws IOException + * if there is an error trying to read the file. + */ + public Job loadJob(MetaInfo metaInfo) throws IOException { + return new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(), + metaInfo.getHistoryFile(), false, metaInfo.getJobIndexInfo().getUser(), + metaInfo.getConfFile(), aclsMgr); + } + + public Collection getAllMetaInfo() throws IOException { + scanIntermediateDirectory(); + ArrayList result = new ArrayList(); + result.addAll(intermediateListCache.values()); + result.addAll(jobListCache.values()); + return result; + } + + Collection getIntermediateMetaInfos() throws IOException { + scanIntermediateDirectory(); + return intermediateListCache.values(); + } + + public MetaInfo getMetaInfo(JobId jobId) throws IOException { + // MetaInfo available in cache. + MetaInfo metaInfo = null; + if (jobListCache.containsKey(jobId)) { + metaInfo = jobListCache.get(jobId); + } + + if (metaInfo != null) { + return metaInfo; + } + + // MetaInfo not available. Check intermediate directory for meta info. + metaInfo = scanIntermediateForJob(jobId); + if (metaInfo != null) { + return metaInfo; + } + + // Intermediate directory does not contain job. Search through older ones. + metaInfo = scanOldDirsForJob(jobId); + if (metaInfo != null) { + return metaInfo; + } + return null; + } + + void moveToDone(MetaInfo metaInfo) throws IOException { + long completeTime = metaInfo.getJobIndexInfo().getFinishTime(); + if (completeTime == 0) + completeTime = System.currentTimeMillis(); + JobId jobId = metaInfo.getJobIndexInfo().getJobId(); + + List paths = new ArrayList(); + Path historyFile = metaInfo.getHistoryFile(); + if (historyFile == null) { + LOG.info("No file for job-history with " + jobId + " found in cache!"); + } else { + paths.add(historyFile); + } + + Path confFile = metaInfo.getConfFile(); + if (confFile == null) { + LOG.info("No file for jobConf with " + jobId + " found in cache!"); + } else { + paths.add(confFile); + } + + // TODO Check all mi getters and setters for the conf path + Path summaryFile = metaInfo.getSummaryFile(); + if (summaryFile == null) { + LOG.info("No summary file for job: " + jobId); + } else { + try { + String jobSummaryString = getJobSummary(intermediateDoneDirFc, + summaryFile); + SUMMARY_LOG.info(jobSummaryString); + LOG.info("Deleting JobSummary file: [" + summaryFile + "]"); + intermediateDoneDirFc.delete(summaryFile, false); + metaInfo.setSummaryFile(null); + } catch (IOException e) { + LOG.warn("Failed to process summary file: [" + summaryFile + "]"); + throw e; + } + } + + Path targetDir = canonicalHistoryLogPath(jobId, completeTime); + addDirectoryToSerialNumberIndex(targetDir); + try { + makeDoneSubdir(targetDir); + } catch (IOException e) { + LOG.warn("Failed creating subdirectory: " + targetDir + + " while attempting to move files for jobId: " + jobId); + throw e; + } + synchronized (metaInfo) { + if (historyFile != null) { + Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile + .getName())); + try { + moveToDoneNow(historyFile, toPath); + } catch (IOException e) { + LOG.warn("Failed to move file: " + historyFile + " for jobId: " + + jobId); + throw e; + } + metaInfo.setHistoryFile(toPath); + } + if (confFile != null) { + Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile + .getName())); + try { + moveToDoneNow(confFile, toPath); + } catch (IOException e) { + LOG.warn("Failed to move file: " + historyFile + " for jobId: " + + jobId); + throw e; + } + metaInfo.setConfFile(toPath); + } + } + addToJobListCache(metaInfo); + intermediateListCache.remove(jobId); + } + + private void moveToDoneNow(final Path src, final Path target) + throws IOException { + LOG.info("Moving " + src.toString() + " to " + target.toString()); + intermediateDoneDirFc.rename(src, target, Options.Rename.NONE); + } + + private String getJobSummary(FileContext fc, Path path) throws IOException { + Path qPath = fc.makeQualified(path); + FSDataInputStream in = fc.open(qPath); + String jobSummaryString = in.readUTF(); + in.close(); + return jobSummaryString; + } + + private void makeDoneSubdir(Path path) throws IOException { + boolean existsInExistingCache = false; + synchronized (existingDoneSubdirs) { + if (existingDoneSubdirs.contains(path)) + existsInExistingCache = true; + } + try { + doneDirFc.getFileStatus(path); + if (!existsInExistingCache) { + existingDoneSubdirs.add(path); + if (LOG.isDebugEnabled()) { + LOG.debug("JobHistory.maybeMakeSubdirectory -- We believed " + path + + " already existed, but it didn't."); + } + } + } catch (FileNotFoundException fnfE) { + try { + FsPermission fsp = new FsPermission( + JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION); + doneDirFc.mkdir(path, fsp, true); + FileStatus fsStatus = doneDirFc.getFileStatus(path); + LOG.info("Perms after creating " + fsStatus.getPermission().toShort() + + ", Expected: " + fsp.toShort()); + if (fsStatus.getPermission().toShort() != fsp.toShort()) { + LOG.info("Explicitly setting permissions to : " + fsp.toShort() + + ", " + fsp); + doneDirFc.setPermission(path, fsp); + } + synchronized (existingDoneSubdirs) { + existingDoneSubdirs.add(path); + } + } catch (FileAlreadyExistsException faeE) { + // Nothing to do. + } + } + } + + private Path canonicalHistoryLogPath(JobId id, String timestampComponent) { + return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( + id, timestampComponent, serialNumberFormat)); + } + + private Path canonicalHistoryLogPath(JobId id, long millisecondTime) { + String timestampComponent = JobHistoryUtils.timestampDirectoryComponent( + millisecondTime, debugMode); + return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( + id, timestampComponent, serialNumberFormat)); + } + + private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) { + if (finishTime == 0) { + return fileStatus.getModificationTime(); + } + return finishTime; + } + + private void deleteJobFromDone(MetaInfo metaInfo) throws IOException { + jobListCache.remove(metaInfo.getJobId()); + doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getHistoryFile()), false); + doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getConfFile()), false); + } + + @SuppressWarnings("unchecked") + void clean(long cutoff, HistoryStorage storage) throws IOException { + // TODO this should be replaced by something that knows about the directory + // structure and will put less of a load on HDFS. + boolean halted = false; + // TODO Delete YYYY/MM/DD directories. + List serialDirList = findTimestampedDirectories(); + // Sort in ascending order. Relies on YYYY/MM/DD/Serial + Collections.sort(serialDirList); + for (FileStatus serialDir : serialDirList) { + List historyFileList = scanDirectoryForHistoryFiles( + serialDir.getPath(), doneDirFc); + for (FileStatus historyFile : historyFileList) { + JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile + .getPath().getName()); + long effectiveTimestamp = getEffectiveTimestamp( + jobIndexInfo.getFinishTime(), historyFile); + if (effectiveTimestamp <= cutoff) { + String confFileName = JobHistoryUtils + .getIntermediateConfFileName(jobIndexInfo.getJobId()); + MetaInfo metaInfo = new MetaInfo(historyFile.getPath(), new Path( + historyFile.getPath().getParent(), confFileName), null, + jobIndexInfo); + storage.jobRemovedFromHDFS(metaInfo.getJobId()); + deleteJobFromDone(metaInfo); + } else { + halted = true; + break; + } + } + if (!halted) { + doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true); + removeDirectoryFromSerialNumberIndex(serialDir.getPath()); + synchronized (existingDoneSubdirs) { + existingDoneSubdirs.remove(serialDir.getPath()); + } + } else { + break; // Don't scan any more directories. + } + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java new file mode 100644 index 00000000000..bbdf9feabc6 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java @@ -0,0 +1,80 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.hs; + +import java.util.Map; + +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Provides an API to query jobs that have finished. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface HistoryStorage { + + /** + * Give the Storage a reference to a class that can be used to interact with + * history files. + * @param hsManager the class that is used to interact with history files. + */ + void setHistoryFileManager(HistoryFileManager hsManager); + + /** + * Look for a set of partial jobs. + * @param offset the offset into the list of jobs. + * @param count the maximum number of jobs to return. + * @param user only return jobs for the given user. + * @param queue only return jobs for in the given queue. + * @param sBegin only return Jobs that started on or after the given time. + * @param sEnd only return Jobs that started on or before the given time. + * @param fBegin only return Jobs that ended on or after the given time. + * @param fEnd only return Jobs that ended on or before the given time. + * @param jobState only return Jobs that are in the given job state. + * @return The list of filtered jobs. + */ + JobsInfo getPartialJobs(Long offset, Long count, String user, + String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, + JobState jobState); + + /** + * Get all of the cached jobs. This only returns partial jobs and is here for + * legacy reasons. + * @return all of the cached jobs + */ + Map getAllPartialJobs(); + + /** + * Get a fully parsed job. + * @param jobId the id of the job + * @return the job, or null if it is not found. + */ + Job getFullJob(JobId jobId); + + /** + * Informs the Storage that a job has been removed from HDFS + * @param jobId the ID of the job that was removed. + */ + void jobRemovedFromHDFS(JobId jobId); +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java index c0581655597..54ffec6924d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java @@ -1,36 +1,26 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.mapreduce.v2.hs; -import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; @@ -41,26 +31,16 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.mapred.JobACLsManager; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; -import org.apache.hadoop.mapreduce.jobhistory.JobSummary; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.job.Job; -import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; +import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.MetaInfo; +import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; -import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; -import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.ClusterInfo; import org.apache.hadoop.yarn.YarnException; @@ -69,106 +49,36 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.service.Service; import com.google.common.util.concurrent.ThreadFactoryBuilder; - -/* +/** * Loads and manages the Job history cache. */ -public class JobHistory extends AbstractService implements HistoryContext { - - private static final int DEFAULT_JOBLIST_CACHE_SIZE = 20000; - private static final int DEFAULT_LOADEDJOB_CACHE_SIZE = 5; - private static final int DEFAULT_DATESTRING_CACHE_SIZE = 200000; - private static final long DEFAULT_MOVE_THREAD_INTERVAL = 3 * 60 * 1000l; //3 minutes - private static final int DEFAULT_MOVE_THREAD_COUNT = 3; - - static final long DEFAULT_HISTORY_MAX_AGE = 7 * 24 * 60 * 60 * 1000L; //1 week - static final long DEFAULT_RUN_INTERVAL = 1 * 24 * 60 * 60 * 1000l; //1 day - +public class JobHistory extends AbstractService implements HistoryContext { private static final Log LOG = LogFactory.getLog(JobHistory.class); - private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class); - public static final Pattern CONF_FILENAME_REGEX = - Pattern.compile("(" + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?"); + public static final Pattern CONF_FILENAME_REGEX = Pattern.compile("(" + + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?"); public static final String OLD_SUFFIX = ".old"; - private static String DONE_BEFORE_SERIAL_TAIL = - JobHistoryUtils.doneSubdirsBeforeSerialTail(); - - /** - * Maps between a serial number (generated based on jobId) and the timestamp - * component(s) to which it belongs. - * Facilitates jobId based searches. - * If a jobId is not found in this list - it will not be found. - */ - private final SortedMap> idToDateString = - new ConcurrentSkipListMap>(); - - //Maintains minimal details for recent jobs (parsed from history file name). - //Sorted on Job Completion Time. - private final SortedMap jobListCache = - new ConcurrentSkipListMap(); - - - // Re-use exisiting MetaInfo objects if they exist for the specific JobId. (synchronization on MetaInfo) - // Check for existance of the object when using iterators. - private final SortedMap intermediateListCache = - new ConcurrentSkipListMap(); - - //Maintains a list of known done subdirectories. Not currently used. - private final Set existingDoneSubdirs = new HashSet(); - - private Map loadedJobCache = null; - - /** - * Maintains a mapping between intermediate user directories and the last - * known modification time. - */ - private Map userDirModificationTimeMap = - new HashMap(); - - //The number of jobs to maintain in the job list cache. - private int jobListCacheSize; - - private JobACLsManager aclsMgr; - - //The number of loaded jobs. - private int loadedJobCacheSize; - - //The number of entries in idToDateString - private int dateStringCacheSize; - - //Time interval for the move thread. + // Time interval for the move thread. private long moveThreadInterval; - - //Number of move threads. + + // Number of move threads. private int numMoveThreads; - + private Configuration conf; - private boolean debugMode; - private int serialNumberLowDigits; - private String serialNumberFormat; - - - private Path doneDirPrefixPath = null; // folder for completed jobs - private FileContext doneDirFc; // done Dir FileContext - - private Path intermediateDoneDirPath = null; //Intermediate Done Dir Path - private FileContext intermediateDoneDirFc; //Intermediate Done Dir FileContext - private Thread moveIntermediateToDoneThread = null; private MoveIntermediateToDoneRunnable moveIntermediateToDoneRunnable = null; - private ScheduledThreadPoolExecutor cleanerScheduledExecutor = null; - - /** - * Writes out files to the path - * .....${DONE_DIR}/VERSION_STRING/YYYY/MM/DD/HH/SERIAL_NUM/jh{index_entries}.jhist - */ - @SuppressWarnings("serial") + private ScheduledThreadPoolExecutor cleanerScheduledExecutor = null; + + private HistoryStorage storage = null; + private HistoryFileManager hsManager = null; + @Override public void init(Configuration conf) throws YarnException { LOG.info("JobHistory Init"); @@ -176,121 +86,66 @@ public void init(Configuration conf) throws YarnException { this.appID = RecordFactoryProvider.getRecordFactory(conf) .newRecordInstance(ApplicationId.class); this.appAttemptID = RecordFactoryProvider.getRecordFactory(conf) - .newRecordInstance(ApplicationAttemptId.class); + .newRecordInstance(ApplicationAttemptId.class); - debugMode = conf.getBoolean(JHAdminConfig.MR_HISTORY_DEBUG_MODE, false); - serialNumberLowDigits = debugMode ? 1 : 3; - serialNumberFormat = ("%0" - + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS - + serialNumberLowDigits) + "d"); - - String doneDirPrefix = null; - doneDirPrefix = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf); - try { - doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified( - new Path(doneDirPrefix)); - doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf); - mkdir(doneDirFc, doneDirPrefixPath, new FsPermission( - JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION)); - } catch (IOException e) { - throw new YarnException("Error creating done directory: [" + - doneDirPrefixPath + "]", e); - } - - String intermediateDoneDirPrefix = null; - intermediateDoneDirPrefix = JobHistoryUtils - .getConfiguredHistoryIntermediateDoneDirPrefix(conf); - try { - intermediateDoneDirPath = FileContext.getFileContext(conf) - .makeQualified(new Path(intermediateDoneDirPrefix)); - intermediateDoneDirFc = FileContext.getFileContext( - intermediateDoneDirPath.toUri(), conf); - mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission( - JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort())); - } catch (IOException e) { - LOG.info("error creating done directory on dfs " + e); - throw new YarnException("Error creating intermediate done directory: [" - + intermediateDoneDirPath + "]", e); - } - - this.aclsMgr = new JobACLsManager(conf); - - jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, - DEFAULT_JOBLIST_CACHE_SIZE); - loadedJobCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, - DEFAULT_LOADEDJOB_CACHE_SIZE); - dateStringCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE, - DEFAULT_DATESTRING_CACHE_SIZE); - moveThreadInterval = - conf.getLong(JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS, - DEFAULT_MOVE_THREAD_INTERVAL); + moveThreadInterval = conf.getLong( + JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS, + JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS); numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, - DEFAULT_MOVE_THREAD_COUNT); - - loadedJobCache = - Collections.synchronizedMap(new LinkedHashMap( - loadedJobCacheSize + 1, 0.75f, true) { - @Override - public boolean removeEldestEntry(final Map.Entry eldest) { - return super.size() > loadedJobCacheSize; - } - }); - + JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT); + + hsManager = new HistoryFileManager(); + hsManager.init(conf); try { - initExisting(); + hsManager.initExisting(); } catch (IOException e) { throw new YarnException("Failed to intialize existing directories", e); } - super.init(conf); - } - - private void mkdir(FileContext fc, Path path, FsPermission fsp) - throws IOException { - if (!fc.util().exists(path)) { - try { - fc.mkdir(path, fsp, true); - FileStatus fsStatus = fc.getFileStatus(path); - LOG.info("Perms after creating " + fsStatus.getPermission().toShort() - + ", Expected: " + fsp.toShort()); - if (fsStatus.getPermission().toShort() != fsp.toShort()) { - LOG.info("Explicitly setting permissions to : " + fsp.toShort() - + ", " + fsp); - fc.setPermission(path, fsp); - } - } catch (FileAlreadyExistsException e) { - LOG.info("Directory: [" + path + "] already exists."); - } + storage = ReflectionUtils.newInstance(conf.getClass( + JHAdminConfig.MR_HISTORY_STORAGE, CachedHistoryStorage.class, + HistoryStorage.class), conf); + if (storage instanceof Service) { + ((Service) storage).init(conf); } + storage.setHistoryFileManager(hsManager); + + super.init(conf); } @Override public void start() { - //Start moveIntermediatToDoneThread - moveIntermediateToDoneRunnable = - new MoveIntermediateToDoneRunnable(moveThreadInterval, numMoveThreads); + hsManager.start(); + if (storage instanceof Service) { + ((Service) storage).start(); + } + + // Start moveIntermediatToDoneThread + moveIntermediateToDoneRunnable = new MoveIntermediateToDoneRunnable( + moveThreadInterval, numMoveThreads); moveIntermediateToDoneThread = new Thread(moveIntermediateToDoneRunnable); moveIntermediateToDoneThread.setName("MoveIntermediateToDoneScanner"); moveIntermediateToDoneThread.start(); - - //Start historyCleaner + + // Start historyCleaner boolean startCleanerService = conf.getBoolean( JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true); if (startCleanerService) { long maxAgeOfHistoryFiles = conf.getLong( - JHAdminConfig.MR_HISTORY_MAX_AGE_MS, DEFAULT_HISTORY_MAX_AGE); + JHAdminConfig.MR_HISTORY_MAX_AGE_MS, + JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE); cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1, - new ThreadFactoryBuilder().setNameFormat("LogCleaner").build() - ); + new ThreadFactoryBuilder().setNameFormat("LogCleaner").build()); long runInterval = conf.getLong( - JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, DEFAULT_RUN_INTERVAL); + JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, + JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS); cleanerScheduledExecutor .scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles), 30 * 1000l, runInterval, TimeUnit.MILLISECONDS); } super.start(); } - + @Override public void stop() { LOG.info("Stopping JobHistory"); @@ -323,281 +178,16 @@ public void stop() { LOG.warn("HistoryCleanerService shutdown may not have succeeded"); } } + if (storage instanceof Service) { + ((Service) storage).stop(); + } + hsManager.stop(); super.stop(); } - + public JobHistory() { super(JobHistory.class.getName()); } - - /** - * Populates index data structures. - * Should only be called at initialization times. - */ - @SuppressWarnings("unchecked") - private void initExisting() throws IOException { - LOG.info("Initializing Existing Jobs..."); - List timestampedDirList = findTimestampedDirectories(); - Collections.sort(timestampedDirList); - for (FileStatus fs : timestampedDirList) { - //TODO Could verify the correct format for these directories. - addDirectoryToSerialNumberIndex(fs.getPath()); - addDirectoryToJobListCache(fs.getPath()); - } - } - - private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) { - String serialPart = serialDirPath.getName(); - String timeStampPart = - JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString()); - if (timeStampPart == null) { - LOG.warn("Could not find timestamp portion from path: " + - serialDirPath.toString() +". Continuing with next"); - return; - } - if (serialPart == null) { - LOG.warn("Could not find serial portion from path: " + - serialDirPath.toString() + ". Continuing with next"); - return; - } - if (idToDateString.containsKey(serialPart)) { - Set set = idToDateString.get(serialPart); - set.remove(timeStampPart); - if (set.isEmpty()) { - idToDateString.remove(serialPart); - } - } - - } - - private void addDirectoryToSerialNumberIndex(Path serialDirPath) { - if(LOG.isDebugEnabled()) { - LOG.debug("Adding "+serialDirPath+" to serial index"); - } - String serialPart = serialDirPath.getName(); - String timestampPart = - JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString()); - if (timestampPart == null) { - LOG.warn("Could not find timestamp portion from path: " + - serialDirPath.toString() +". Continuing with next"); - return; - } - if (serialPart == null) { - LOG.warn("Could not find serial portion from path: " + - serialDirPath.toString() + ". Continuing with next"); - } - addToSerialNumberIndex(serialPart, timestampPart); - } - - private void addToSerialNumberIndex(String serialPart, String timestampPart) { - if (!idToDateString.containsKey(serialPart)) { - idToDateString.put(serialPart, new HashSet()); - if (idToDateString.size() > dateStringCacheSize) { - idToDateString.remove(idToDateString.firstKey()); - } - Set datePartSet = idToDateString.get(serialPart); - datePartSet.add(timestampPart); - } - } - - private void addDirectoryToJobListCache(Path path) throws IOException { - if(LOG.isDebugEnabled()) { - LOG.debug("Adding "+path+" to job list cache."); - } - List historyFileList = scanDirectoryForHistoryFiles(path, - doneDirFc); - for (FileStatus fs : historyFileList) { - if(LOG.isDebugEnabled()) { - LOG.debug("Adding in history for "+fs.getPath()); - } - JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() - .getName()); - String confFileName = JobHistoryUtils - .getIntermediateConfFileName(jobIndexInfo.getJobId()); - String summaryFileName = JobHistoryUtils - .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); - MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath() - .getParent(), confFileName), new Path(fs.getPath().getParent(), - summaryFileName), jobIndexInfo); - addToJobListCache(jobIndexInfo.getJobId(), metaInfo); - } - } - - private static List scanDirectory(Path path, FileContext fc, - PathFilter pathFilter) throws IOException { - path = fc.makeQualified(path); - List jhStatusList = new ArrayList(); - RemoteIterator fileStatusIter = fc.listStatus(path); - while (fileStatusIter.hasNext()) { - FileStatus fileStatus = fileStatusIter.next(); - Path filePath = fileStatus.getPath(); - if (fileStatus.isFile() && pathFilter.accept(filePath)) { - jhStatusList.add(fileStatus); - } - } - return jhStatusList; - } - - private static List scanDirectoryForHistoryFiles(Path path, - FileContext fc) throws IOException { - return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter()); - } - - /** - * Finds all history directories with a timestamp component by scanning - * the filesystem. - * Used when the JobHistory server is started. - * @return - */ - private List findTimestampedDirectories() throws IOException { - List fsList = JobHistoryUtils.localGlobber(doneDirFc, - doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL); - return fsList; - } - - /** - * Adds an entry to the job list cache. Maintains the size. - */ - private void addToJobListCache(JobId jobId, MetaInfo metaInfo) { - if(LOG.isDebugEnabled()) { - LOG.debug("Adding "+jobId+" to job list cache with " - +metaInfo.getJobIndexInfo()); - } - jobListCache.put(jobId, metaInfo); - if (jobListCache.size() > jobListCacheSize) { - jobListCache.remove(jobListCache.firstKey()); - } - } - - /** - * Adds an entry to the loaded job cache. Maintains the size. - */ - private void addToLoadedJobCache(Job job) { - if(LOG.isDebugEnabled()) { - LOG.debug("Adding "+job.getID()+" to loaded job cache"); - } - loadedJobCache.put(job.getID(), job); - } - - - /** - * Scans the intermediate directory to find user directories. Scans these - * for history files if the modification time for the directory has changed. - * @throws IOException - */ - private void scanIntermediateDirectory() throws IOException { - List userDirList = - JobHistoryUtils.localGlobber(intermediateDoneDirFc, intermediateDoneDirPath, ""); - - for (FileStatus userDir : userDirList) { - String name = userDir.getPath().getName(); - long newModificationTime = userDir.getModificationTime(); - boolean shouldScan = false; - synchronized (userDirModificationTimeMap) { - if (!userDirModificationTimeMap.containsKey(name) || newModificationTime - > userDirModificationTimeMap.get(name)) { - shouldScan = true; - userDirModificationTimeMap.put(name, newModificationTime); - } - } - if (shouldScan) { - scanIntermediateDirectory(userDir.getPath()); - } - } - } - - /** - * Scans the specified path and populates the intermediate cache. - * @param absPath - * @throws IOException - */ - private void scanIntermediateDirectory(final Path absPath) - throws IOException { - List fileStatusList = scanDirectoryForHistoryFiles(absPath, - intermediateDoneDirFc); - for (FileStatus fs : fileStatusList) { - JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() - .getName()); - String confFileName = JobHistoryUtils - .getIntermediateConfFileName(jobIndexInfo.getJobId()); - String summaryFileName = JobHistoryUtils - .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); - MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath() - .getParent(), confFileName), new Path(fs.getPath().getParent(), - summaryFileName), jobIndexInfo); - if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) { - intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo); - } - } - } - - /** - * Searches the job history file FileStatus list for the specified JobId. - * - * @param fileStatusList fileStatus list of Job History Files. - * @param jobId The JobId to find. - * @param checkForDoneFile whether to check for the existance of a done file. - * @return A MetaInfo object for the jobId, null if not found. - * @throws IOException - */ - private MetaInfo getJobMetaInfo(List fileStatusList, JobId jobId) - throws IOException { - for (FileStatus fs : fileStatusList) { - JobIndexInfo jobIndexInfo = - FileNameIndexUtils.getIndexInfo(fs.getPath().getName()); - if (jobIndexInfo.getJobId().equals(jobId)) { - String confFileName = JobHistoryUtils - .getIntermediateConfFileName(jobIndexInfo.getJobId()); - String summaryFileName = JobHistoryUtils - .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); - MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath() - .getParent(), confFileName), new Path(fs.getPath().getParent(), - summaryFileName), jobIndexInfo); - return metaInfo; - } - } - return null; - } - - /** - * Scans old directories known by the idToDateString map for the specified - * jobId. - * If the number of directories is higher than the supported size of the - * idToDateString cache, the jobId will not be found. - * @param jobId the jobId. - * @return - * @throws IOException - */ - private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException { - int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId); - String boxedSerialNumber = String.valueOf(jobSerialNumber); - Set dateStringSet = idToDateString.get(boxedSerialNumber); - if (dateStringSet == null) { - return null; - } - for (String timestampPart : dateStringSet) { - Path logDir = canonicalHistoryLogPath(jobId, timestampPart); - List fileStatusList = scanDirectoryForHistoryFiles(logDir, - doneDirFc); - MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId); - if (metaInfo != null) { - return metaInfo; - } - } - return null; - } - - /** - * Checks for the existence of the job history file in the intermediate - * directory. - * @param jobId - * @return - * @throws IOException - */ - private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException { - scanIntermediateDirectory(); - return intermediateListCache.get(jobId); - } @Override public String getApplicationName() { @@ -609,486 +199,167 @@ private class MoveIntermediateToDoneRunnable implements Runnable { private long sleepTime; private ThreadPoolExecutor moveToDoneExecutor = null; private boolean running = false; - - public void stop() { + + public synchronized void stop() { running = false; + notify(); } - + MoveIntermediateToDoneRunnable(long sleepTime, int numMoveThreads) { this.sleepTime = sleepTime; - ThreadFactory tf = new ThreadFactoryBuilder() - .setNameFormat("MoveIntermediateToDone Thread #%d") - .build(); - moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1, + ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( + "MoveIntermediateToDone Thread #%d").build(); + moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1, TimeUnit.HOURS, new LinkedBlockingQueue(), tf); running = true; } - - @Override + + @Override public void run() { Thread.currentThread().setName("IntermediateHistoryScanner"); try { - while (running) { + while (true) { LOG.info("Starting scan to move intermediate done files"); - scanIntermediateDirectory(); - for (final MetaInfo metaInfo : intermediateListCache.values()) { + for (final MetaInfo metaInfo : hsManager.getIntermediateMetaInfos()) { moveToDoneExecutor.execute(new Runnable() { @Override public void run() { try { - moveToDone(metaInfo); + hsManager.moveToDone(metaInfo); } catch (IOException e) { - LOG.info("Failed to process metaInfo for job: " + - metaInfo.jobIndexInfo.getJobId(), e); + LOG.info( + "Failed to process metaInfo for job: " + + metaInfo.getJobId(), e); } } }); - } - synchronized (this) { // TODO Is this really required. + synchronized (this) { try { this.wait(sleepTime); } catch (InterruptedException e) { LOG.info("IntermediateHistoryScannerThread interrupted"); } + if (!running) { + break; + } } } } catch (IOException e) { - LOG.warn("Unable to get a list of intermediate files to be moved from: " - + intermediateDoneDirPath); + LOG.warn("Unable to get a list of intermediate files to be moved"); + // TODO Shut down the entire process!!!! } } } - - private Job loadJob(MetaInfo metaInfo) { - synchronized(metaInfo) { - try { - Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(), - metaInfo.getHistoryFile(), false, metaInfo.getJobIndexInfo().getUser(), - metaInfo.getConfFile(), this.aclsMgr); - addToLoadedJobCache(job); - return job; - } catch (IOException e) { - throw new YarnException("Could not find/load job: " + - metaInfo.getJobIndexInfo().getJobId(), e); - } - } - } - - private Map getAllJobsInternal() { - //TODO This should ideally be using getAllJobsMetaInfo - // or get rid of that method once Job has APIs for user, finishTime etc. - SortedMap result = new TreeMap(); - try { - scanIntermediateDirectory(); - } catch (IOException e) { - LOG.warn("Failed to scan intermediate directory", e); - throw new YarnException(e); - } - for (JobId jobId : intermediateListCache.keySet()) { - MetaInfo mi = intermediateListCache.get(jobId); - if (mi != null) { - result.put(jobId, new PartialJob(mi.getJobIndexInfo(), mi - .getJobIndexInfo().getJobId())); - } - } - for (JobId jobId : jobListCache.keySet()) { - MetaInfo mi = jobListCache.get(jobId); - if (mi != null) { - result.put(jobId, new PartialJob(mi.getJobIndexInfo(), mi - .getJobIndexInfo().getJobId())); - } - } - return result; - } /** * Helper method for test cases. */ MetaInfo getJobMetaInfo(JobId jobId) throws IOException { - //MetaInfo available in cache. - MetaInfo metaInfo = null; - if (jobListCache.containsKey(jobId)) { - metaInfo = jobListCache.get(jobId); - } - - if (metaInfo != null) { - return metaInfo; - } - - //MetaInfo not available. Check intermediate directory for meta info. - metaInfo = scanIntermediateForJob(jobId); - if (metaInfo != null) { - return metaInfo; - } - - //Intermediate directory does not contain job. Search through older ones. - metaInfo = scanOldDirsForJob(jobId); - if (metaInfo != null) { - return metaInfo; - } - return null; + return hsManager.getMetaInfo(jobId); } - - private Job findJob(JobId jobId) throws IOException { - //Job already loaded. - if (loadedJobCache.containsKey(jobId)) { - return loadedJobCache.get(jobId); - } - - //MetaInfo available in cache. - MetaInfo metaInfo = null; - if (jobListCache.containsKey(jobId)) { - metaInfo = jobListCache.get(jobId); - } - - if (metaInfo != null) { - return loadJob(metaInfo); - } - - //MetaInfo not available. Check intermediate directory for meta info. - metaInfo = scanIntermediateForJob(jobId); - if (metaInfo != null) { - return loadJob(metaInfo); - } - - //Intermediate directory does not contain job. Search through older ones. - metaInfo = scanOldDirsForJob(jobId); - if (metaInfo != null) { - return loadJob(metaInfo); - } - return null; - } - - private void moveToDone(MetaInfo metaInfo) throws IOException { - long completeTime = metaInfo.getJobIndexInfo().getFinishTime(); - if (completeTime == 0) completeTime = System.currentTimeMillis(); - JobId jobId = metaInfo.getJobIndexInfo().getJobId(); - - List paths = new ArrayList(); - Path historyFile = metaInfo.getHistoryFile(); - if (historyFile == null) { - LOG.info("No file for job-history with " + jobId + " found in cache!"); - } else { - paths.add(historyFile); - } - - Path confFile = metaInfo.getConfFile(); - if (confFile == null) { - LOG.info("No file for jobConf with " + jobId + " found in cache!"); - } else { - paths.add(confFile); - } - - //TODO Check all mi getters and setters for the conf path - Path summaryFile = metaInfo.getSummaryFile(); - if (summaryFile == null) { - LOG.info("No summary file for job: " + jobId); - } else { - try { - String jobSummaryString = getJobSummary(intermediateDoneDirFc, summaryFile); - SUMMARY_LOG.info(jobSummaryString); - LOG.info("Deleting JobSummary file: [" + summaryFile + "]"); - intermediateDoneDirFc.delete(summaryFile, false); - metaInfo.setSummaryFile(null); - } catch (IOException e) { - LOG.warn("Failed to process summary file: [" + summaryFile + "]"); - throw e; - } - } - - Path targetDir = canonicalHistoryLogPath(jobId, completeTime); - addDirectoryToSerialNumberIndex(targetDir); - try { - maybeMakeSubdirectory(targetDir); - } catch (IOException e) { - LOG.warn("Failed creating subdirectory: " + targetDir + - " while attempting to move files for jobId: " + jobId); - throw e; - } - synchronized (metaInfo) { - if (historyFile != null) { - Path toPath = doneDirFc.makeQualified(new Path(targetDir, - historyFile.getName())); - try { - moveToDoneNow(historyFile, toPath); - } catch (IOException e) { - LOG.warn("Failed to move file: " + historyFile + " for jobId: " - + jobId); - throw e; - } - metaInfo.setHistoryFile(toPath); - } - if (confFile != null) { - Path toPath = doneDirFc.makeQualified(new Path(targetDir, - confFile.getName())); - try { - moveToDoneNow(confFile, toPath); - } catch (IOException e) { - LOG.warn("Failed to move file: " + historyFile + " for jobId: " - + jobId); - throw e; - } - metaInfo.setConfFile(toPath); - } - } - addToJobListCache(jobId, metaInfo); - intermediateListCache.remove(jobId); - } - - private void moveToDoneNow(final Path src, final Path target) - throws IOException { - LOG.info("Moving " + src.toString() + " to " + target.toString()); - intermediateDoneDirFc.rename(src, target, Options.Rename.NONE); - // fc.util().copy(src, target); - //fc.delete(src, false); - //intermediateDoneDirFc.setPermission(target, new FsPermission( - //JobHistoryUtils.HISTORY_DONE_FILE_PERMISSION)); - } - - String getJobSummary(FileContext fc, Path path) throws IOException { - Path qPath = fc.makeQualified(path); - FSDataInputStream in = fc.open(qPath); - String jobSummaryString = in.readUTF(); - in.close(); - return jobSummaryString; - } - - private void maybeMakeSubdirectory(Path path) throws IOException { - boolean existsInExistingCache = false; - synchronized(existingDoneSubdirs) { - if (existingDoneSubdirs.contains(path)) existsInExistingCache = true; - } - try { - doneDirFc.getFileStatus(path); - if (!existsInExistingCache) { - existingDoneSubdirs.add(path); - if (debugMode) { - LOG.info("JobHistory.maybeMakeSubdirectory -- We believed " - + path + " already existed, but it didn't."); - } - } - } catch (FileNotFoundException fnfE) { - try { - FsPermission fsp = - new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION); - doneDirFc.mkdir(path, fsp, true); - FileStatus fsStatus = doneDirFc.getFileStatus(path); - LOG.info("Perms after creating " + fsStatus.getPermission().toShort() - + ", Expected: " + fsp.toShort()); - if (fsStatus.getPermission().toShort() != fsp.toShort()) { - LOG.info("Explicitly setting permissions to : " + fsp.toShort() - + ", " + fsp); - doneDirFc.setPermission(path, fsp); - } - synchronized(existingDoneSubdirs) { - existingDoneSubdirs.add(path); - } - } catch (FileAlreadyExistsException faeE) { //Nothing to do. - } - } - } - - private Path canonicalHistoryLogPath(JobId id, String timestampComponent) { - return new Path(doneDirPrefixPath, - JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat)); - } - - private Path canonicalHistoryLogPath(JobId id, long millisecondTime) { - String timestampComponent = - JobHistoryUtils.timestampDirectoryComponent(millisecondTime, debugMode); - return new Path(doneDirPrefixPath, - JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat)); - } - @Override - public synchronized Job getJob(JobId jobId) { - if(LOG.isDebugEnabled()) { - LOG.debug("Looking for Job "+jobId); - } - Job job = null; - try { - job = findJob(jobId); - //This could return a null job. - } catch (IOException e) { - throw new YarnException(e); - } - return job; + public Job getJob(JobId jobId) { + return storage.getFullJob(jobId); } @Override public Map getAllJobs(ApplicationId appID) { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("Called getAllJobs(AppId): " + appID); } -// currently there is 1 to 1 mapping between app and job id + // currently there is 1 to 1 mapping between app and job id org.apache.hadoop.mapreduce.JobID oldJobID = TypeConverter.fromYarn(appID); Map jobs = new HashMap(); JobId jobID = TypeConverter.toYarn(oldJobID); jobs.put(jobID, getJob(jobID)); return jobs; -// return getAllJobs(); } - - /* (non-Javadoc) - * @see org.apache.hadoop.mapreduce.v2.hs.HistoryContext#getAllJobs() - * - * Returns a recent list of jobs. This may not be the complete set. - * If a previous jobId is known - it can be queries via the getJob(JobId) - * method. - * Size of this list is determined by the size of the job list cache. - * This can be fixed when pagination is implemented - return the first set of - * jobs via the cache, go to DFS only when an attempt is made to navigate - * past the cached list. - * This does involve a DFS oepration of scanning the intermediate directory. - */ + + @Override public Map getAllJobs() { - LOG.debug("Called getAllJobs()"); - return getAllJobsInternal(); + return storage.getAllPartialJobs(); } - static class MetaInfo { - private Path historyFile; - private Path confFile; - private Path summaryFile; - JobIndexInfo jobIndexInfo; - - MetaInfo(Path historyFile, Path confFile, Path summaryFile, - JobIndexInfo jobIndexInfo) { - this.historyFile = historyFile; - this.confFile = confFile; - this.summaryFile = summaryFile; - this.jobIndexInfo = jobIndexInfo; - } - - Path getHistoryFile() { return historyFile; } - Path getConfFile() { return confFile; } - Path getSummaryFile() { return summaryFile; } - JobIndexInfo getJobIndexInfo() { return jobIndexInfo; } - - void setHistoryFile(Path historyFile) { this.historyFile = historyFile; } - void setConfFile(Path confFile) {this.confFile = confFile; } - void setSummaryFile(Path summaryFile) { this.summaryFile = summaryFile; } + /** + * Look for a set of partial jobs. + * + * @param offset + * the offset into the list of jobs. + * @param count + * the maximum number of jobs to return. + * @param user + * only return jobs for the given user. + * @param queue + * only return jobs for in the given queue. + * @param sBegin + * only return Jobs that started on or after the given time. + * @param sEnd + * only return Jobs that started on or before the given time. + * @param fBegin + * only return Jobs that ended on or after the given time. + * @param fEnd + * only return Jobs that ended on or before the given time. + * @param jobState + * only return jobs that are in the give job state. + * @return The list of filtered jobs. + */ + @Override + public JobsInfo getPartialJobs(Long offset, Long count, String user, + String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, + JobState jobState) { + return storage.getPartialJobs(offset, count, user, queue, sBegin, sEnd, + fBegin, fEnd, jobState); } - public class HistoryCleaner implements Runnable { - private long currentTime; - long maxAgeMillis; - long filesDeleted = 0; - long dirsDeleted = 0; - + public HistoryCleaner(long maxAge) { this.maxAgeMillis = maxAge; } - - @SuppressWarnings("unchecked") + public void run() { LOG.info("History Cleaner started"); - currentTime = System.currentTimeMillis(); - boolean halted = false; - //TODO Delete YYYY/MM/DD directories. + long cutoff = System.currentTimeMillis() - maxAgeMillis; try { - List serialDirList = findTimestampedDirectories(); - //Sort in ascending order. Relies on YYYY/MM/DD/Serial - Collections.sort(serialDirList); - for (FileStatus serialDir : serialDirList) { - List historyFileList = - scanDirectoryForHistoryFiles(serialDir.getPath(), doneDirFc); - for (FileStatus historyFile : historyFileList) { - JobIndexInfo jobIndexInfo = - FileNameIndexUtils.getIndexInfo(historyFile.getPath().getName()); - long effectiveTimestamp = - getEffectiveTimestamp(jobIndexInfo.getFinishTime(), historyFile); - if (shouldDelete(effectiveTimestamp)) { - String confFileName = - JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId()); - MetaInfo metaInfo = new MetaInfo(historyFile.getPath(), - new Path(historyFile.getPath().getParent(), confFileName), - null, jobIndexInfo); - delete(metaInfo); - } else { - halted = true; - break; - } - } - if (!halted) { - deleteDir(serialDir.getPath()); - removeDirectoryFromSerialNumberIndex(serialDir.getPath()); - synchronized (existingDoneSubdirs) { - existingDoneSubdirs.remove(serialDir.getPath()); - } - - } else { - break; //Don't scan any more directories. - } - } + hsManager.clean(cutoff, storage); } catch (IOException e) { - LOG.warn("Error in History cleaner run", e); + LOG.warn("Error trying to clean up ", e); } LOG.info("History Cleaner complete"); - LOG.info("FilesDeleted: " + filesDeleted); - LOG.info("Directories Deleted: " + dirsDeleted); } - - private boolean shouldDelete(long ts) { - return ((ts + maxAgeMillis) <= currentTime); - } - - private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) { - if (finishTime == 0) { - return fileStatus.getModificationTime(); - } - return finishTime; - } - - private void delete(MetaInfo metaInfo) throws IOException { - deleteFile(metaInfo.getHistoryFile()); - deleteFile(metaInfo.getConfFile()); - jobListCache.remove(metaInfo.getJobIndexInfo().getJobId()); - loadedJobCache.remove(metaInfo.getJobIndexInfo().getJobId()); - } - - private void deleteFile(final Path path) throws IOException { - doneDirFc.delete(doneDirFc.makeQualified(path), false); - filesDeleted++; - } - - private void deleteDir(Path path) throws IOException { - doneDirFc.delete(doneDirFc.makeQualified(path), true); - dirsDeleted++; - } - } - - - - //TODO AppContext - Not Required - private ApplicationAttemptId appAttemptID; + } + + // TODO AppContext - Not Required + private ApplicationAttemptId appAttemptID; + @Override public ApplicationAttemptId getApplicationAttemptId() { - //TODO fixme - bogus appAttemptID for now + // TODO fixme - bogus appAttemptID for now return appAttemptID; - } - - //TODO AppContext - Not Required + } + + // TODO AppContext - Not Required private ApplicationId appID; + @Override public ApplicationId getApplicationID() { - //TODO fixme - bogus appID for now + // TODO fixme - bogus appID for now return appID; } - - //TODO AppContext - Not Required + + // TODO AppContext - Not Required @Override public EventHandler getEventHandler() { // TODO Auto-generated method stub return null; } - - //TODO AppContext - Not Required + + // TODO AppContext - Not Required private String userName; + @Override public CharSequence getUser() { if (userName != null) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java index 83380ea5f87..f2acbe48a9c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java @@ -51,6 +51,7 @@ public PartialJob(JobIndexInfo jobIndexInfo, JobId jobId) { jobReport = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobReport.class); jobReport.setStartTime(jobIndexInfo.getSubmitTime()); jobReport.setFinishTime(jobIndexInfo.getFinishTime()); + jobReport.setJobState(getState()); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java index 71f1e30a280..76991a27cb9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java @@ -44,6 +44,7 @@ public void setup() { bind(JAXBContextResolver.class); bind(GenericExceptionHandler.class); bind(AppContext.class).toInstance(history); + bind(HistoryContext.class).toInstance(history); route("/", HsController.class); route("/app", HsController.class); route(pajoin("/job", JOB_ID), HsController.class, "job"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java index 404cfbb22cb..71ad89f028e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java @@ -32,10 +32,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobState; -import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; -import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; @@ -49,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptsInfo; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskInfo; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TasksInfo; +import org.apache.hadoop.mapreduce.v2.hs.HistoryContext; import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptInfo; import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptsInfo; import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.HistoryInfo; @@ -64,7 +63,7 @@ @Path("/ws/v1/history") public class HsWebServices { - private final AppContext appCtx; + private final HistoryContext ctx; private WebApp webapp; private final Configuration conf; @@ -72,9 +71,9 @@ public class HsWebServices { UriInfo uriInfo; @Inject - public HsWebServices(final AppContext appCtx, final Configuration conf, + public HsWebServices(final HistoryContext ctx, final Configuration conf, final WebApp webapp) { - this.appCtx = appCtx; + this.ctx = ctx; this.conf = conf; this.webapp = webapp; } @@ -103,33 +102,22 @@ public JobsInfo getJobs(@QueryParam("user") String userQuery, @QueryParam("startedTimeEnd") String startedEnd, @QueryParam("finishedTimeBegin") String finishBegin, @QueryParam("finishedTimeEnd") String finishEnd) { - JobsInfo allJobs = new JobsInfo(); - long num = 0; - boolean checkCount = false; - boolean checkStart = false; - boolean checkEnd = false; - long countNum = 0; - - // set values suitable in case both of begin/end not specified - long sBegin = 0; - long sEnd = Long.MAX_VALUE; - long fBegin = 0; - long fEnd = Long.MAX_VALUE; + Long countParam = null; + if (count != null && !count.isEmpty()) { - checkCount = true; try { - countNum = Long.parseLong(count); + countParam = Long.parseLong(count); } catch (NumberFormatException e) { throw new BadRequestException(e.getMessage()); } - if (countNum <= 0) { + if (countParam <= 0) { throw new BadRequestException("limit value must be greater then 0"); } } + Long sBegin = null; if (startedBegin != null && !startedBegin.isEmpty()) { - checkStart = true; try { sBegin = Long.parseLong(startedBegin); } catch (NumberFormatException e) { @@ -139,8 +127,9 @@ public JobsInfo getJobs(@QueryParam("user") String userQuery, throw new BadRequestException("startedTimeBegin must be greater than 0"); } } + + Long sEnd = null; if (startedEnd != null && !startedEnd.isEmpty()) { - checkStart = true; try { sEnd = Long.parseLong(startedEnd); } catch (NumberFormatException e) { @@ -150,13 +139,13 @@ public JobsInfo getJobs(@QueryParam("user") String userQuery, throw new BadRequestException("startedTimeEnd must be greater than 0"); } } - if (sBegin > sEnd) { + if (sBegin != null && sEnd != null && sBegin > sEnd) { throw new BadRequestException( "startedTimeEnd must be greater than startTimeBegin"); } + Long fBegin = null; if (finishBegin != null && !finishBegin.isEmpty()) { - checkEnd = true; try { fBegin = Long.parseLong(finishBegin); } catch (NumberFormatException e) { @@ -166,8 +155,8 @@ public JobsInfo getJobs(@QueryParam("user") String userQuery, throw new BadRequestException("finishedTimeBegin must be greater than 0"); } } + Long fEnd = null; if (finishEnd != null && !finishEnd.isEmpty()) { - checkEnd = true; try { fEnd = Long.parseLong(finishEnd); } catch (NumberFormatException e) { @@ -177,53 +166,18 @@ public JobsInfo getJobs(@QueryParam("user") String userQuery, throw new BadRequestException("finishedTimeEnd must be greater than 0"); } } - if (fBegin > fEnd) { + if (fBegin != null && fEnd != null && fBegin > fEnd) { throw new BadRequestException( "finishedTimeEnd must be greater than finishedTimeBegin"); } - - for (Job job : appCtx.getAllJobs().values()) { - if (checkCount && num == countNum) { - break; - } - - if (stateQuery != null && !stateQuery.isEmpty()) { - JobState.valueOf(stateQuery); - if (!job.getState().toString().equalsIgnoreCase(stateQuery)) { - continue; - } - } - - // can't really validate queue is a valid one since queues could change - if (queueQuery != null && !queueQuery.isEmpty()) { - if (!job.getQueueName().equals(queueQuery)) { - continue; - } - } - - if (userQuery != null && !userQuery.isEmpty()) { - if (!job.getUserName().equals(userQuery)) { - continue; - } - } - - JobReport report = job.getReport(); - - if (checkStart - && (report.getStartTime() < sBegin || report.getStartTime() > sEnd)) { - continue; - } - if (checkEnd - && (report.getFinishTime() < fBegin || report.getFinishTime() > fEnd)) { - continue; - } - - JobInfo jobInfo = new JobInfo(job); - - allJobs.add(jobInfo); - num++; + + JobState jobState = null; + if (stateQuery != null) { + jobState = JobState.valueOf(stateQuery); } - return allJobs; + + return ctx.getPartialJobs(0l, countParam, userQuery, queueQuery, + sBegin, sEnd, fBegin, fEnd, jobState); } @GET @@ -231,7 +185,7 @@ public JobsInfo getJobs(@QueryParam("user") String userQuery, @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) public JobInfo getJob(@PathParam("jobid") String jid) { - Job job = AMWebServices.getJobFromJobIdString(jid, appCtx); + Job job = AMWebServices.getJobFromJobIdString(jid, ctx); return new JobInfo(job); } @@ -240,7 +194,7 @@ public JobInfo getJob(@PathParam("jobid") String jid) { @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) public AMAttemptsInfo getJobAttempts(@PathParam("jobid") String jid) { - Job job = AMWebServices.getJobFromJobIdString(jid, appCtx); + Job job = AMWebServices.getJobFromJobIdString(jid, ctx); AMAttemptsInfo amAttempts = new AMAttemptsInfo(); for (AMInfo amInfo : job.getAMInfos()) { AMAttemptInfo attempt = new AMAttemptInfo(amInfo, MRApps.toString(job @@ -256,8 +210,8 @@ public AMAttemptsInfo getJobAttempts(@PathParam("jobid") String jid) { @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) public JobCounterInfo getJobCounters(@PathParam("jobid") String jid) { - Job job = AMWebServices.getJobFromJobIdString(jid, appCtx); - return new JobCounterInfo(this.appCtx, job); + Job job = AMWebServices.getJobFromJobIdString(jid, ctx); + return new JobCounterInfo(this.ctx, job); } @GET @@ -265,7 +219,7 @@ public JobCounterInfo getJobCounters(@PathParam("jobid") String jid) { @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) public ConfInfo getJobConf(@PathParam("jobid") String jid) { - Job job = AMWebServices.getJobFromJobIdString(jid, appCtx); + Job job = AMWebServices.getJobFromJobIdString(jid, ctx); ConfInfo info; try { info = new ConfInfo(job, this.conf); @@ -282,7 +236,7 @@ public ConfInfo getJobConf(@PathParam("jobid") String jid) { public TasksInfo getJobTasks(@PathParam("jobid") String jid, @QueryParam("type") String type) { - Job job = AMWebServices.getJobFromJobIdString(jid, appCtx); + Job job = AMWebServices.getJobFromJobIdString(jid, ctx); TasksInfo allTasks = new TasksInfo(); for (Task task : job.getTasks().values()) { TaskType ttype = null; @@ -307,7 +261,7 @@ public TasksInfo getJobTasks(@PathParam("jobid") String jid, public TaskInfo getJobTask(@PathParam("jobid") String jid, @PathParam("taskid") String tid) { - Job job = AMWebServices.getJobFromJobIdString(jid, appCtx); + Job job = AMWebServices.getJobFromJobIdString(jid, ctx); Task task = AMWebServices.getTaskFromTaskIdString(tid, job); return new TaskInfo(task); @@ -319,7 +273,7 @@ public TaskInfo getJobTask(@PathParam("jobid") String jid, public JobTaskCounterInfo getSingleTaskCounters( @PathParam("jobid") String jid, @PathParam("taskid") String tid) { - Job job = AMWebServices.getJobFromJobIdString(jid, appCtx); + Job job = AMWebServices.getJobFromJobIdString(jid, ctx); TaskId taskID = MRApps.toTaskID(tid); if (taskID == null) { throw new NotFoundException("taskid " + tid + " not found or invalid"); @@ -338,7 +292,7 @@ public TaskAttemptsInfo getJobTaskAttempts(@PathParam("jobid") String jid, @PathParam("taskid") String tid) { TaskAttemptsInfo attempts = new TaskAttemptsInfo(); - Job job = AMWebServices.getJobFromJobIdString(jid, appCtx); + Job job = AMWebServices.getJobFromJobIdString(jid, ctx); Task task = AMWebServices.getTaskFromTaskIdString(tid, job); for (TaskAttempt ta : task.getAttempts().values()) { if (ta != null) { @@ -358,7 +312,7 @@ public TaskAttemptsInfo getJobTaskAttempts(@PathParam("jobid") String jid, public TaskAttemptInfo getJobTaskAttemptId(@PathParam("jobid") String jid, @PathParam("taskid") String tid, @PathParam("attemptid") String attId) { - Job job = AMWebServices.getJobFromJobIdString(jid, appCtx); + Job job = AMWebServices.getJobFromJobIdString(jid, ctx); Task task = AMWebServices.getTaskFromTaskIdString(tid, job); TaskAttempt ta = AMWebServices.getTaskAttemptFromTaskAttemptString(attId, task); @@ -376,7 +330,7 @@ public JobTaskAttemptCounterInfo getJobTaskAttemptIdCounters( @PathParam("jobid") String jid, @PathParam("taskid") String tid, @PathParam("attemptid") String attId) { - Job job = AMWebServices.getJobFromJobIdString(jid, appCtx); + Job job = AMWebServices.getJobFromJobIdString(jid, ctx); Task task = AMWebServices.getTaskFromTaskIdString(tid, job); TaskAttempt ta = AMWebServices.getTaskAttemptFromTaskAttemptString(attId, task); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java index d737cd23766..2b0d8965ec8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java @@ -92,6 +92,14 @@ public void testHistoryParsingWithParseErrors() throws Exception { checkHistoryParsing(3, 0, 2); } + private static String getJobSummary(FileContext fc, Path path) throws IOException { + Path qPath = fc.makeQualified(path); + FSDataInputStream in = fc.open(qPath); + String jobSummaryString = in.readUTF(); + in.close(); + return jobSummaryString; + } + private void checkHistoryParsing(final int numMaps, final int numReduces, final int numSuccessfulMaps) throws Exception { @@ -244,7 +252,7 @@ public HistoryEvent answer(InvocationOnMock invocation) String summaryFileName = JobHistoryUtils .getIntermediateSummaryFileName(jobId); Path summaryFile = new Path(jobhistoryDir, summaryFileName); - String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile); + String jobSummaryString = getJobSummary(fc, summaryFile); Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100")); Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100")); Assert.assertNotNull(jobSummaryString); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServices.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServices.java index e14b28c993b..ce6a31bc727 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServices.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServices.java @@ -30,11 +30,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.MockJobs; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.hs.HistoryContext; import org.apache.hadoop.mapreduce.v2.hs.JobHistory; +import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.ClusterInfo; @@ -77,7 +79,7 @@ public class TestHsWebServices extends JerseyTest { private static TestAppContext appContext; private static HsWebApp webApp; - static class TestAppContext implements AppContext { + static class TestAppContext implements HistoryContext { final ApplicationAttemptId appAttemptID; final ApplicationId appID; final String user = MockJobs.newUserName(); @@ -144,6 +146,20 @@ public long getStartTime() { public ClusterInfo getClusterInfo() { return null; } + + @Override + public Map getAllJobs(ApplicationId appID) { + // TODO Auto-generated method stub + return null; + } + + @Override + public JobsInfo getPartialJobs(Long offset, Long count, String user, + String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, + JobState jobState) { + // TODO Auto-generated method stub + return null; + } } private Injector injector = Guice.createInjector(new ServletModule() { @@ -160,6 +176,7 @@ protected void configureServlets() { bind(GenericExceptionHandler.class); bind(WebApp.class).toInstance(webApp); bind(AppContext.class).toInstance(appContext); + bind(HistoryContext.class).toInstance(appContext); bind(Configuration.class).toInstance(conf); serve("/*").with(GuiceContainer.class); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java index 79e66af7245..a584987d427 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; @@ -42,6 +43,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.hs.HistoryContext; +import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.ClusterInfo; @@ -89,7 +92,7 @@ public class TestHsWebServicesAttempts extends JerseyTest { private static TestAppContext appContext; private static HsWebApp webApp; - static class TestAppContext implements AppContext { + static class TestAppContext implements HistoryContext { final ApplicationAttemptId appAttemptID; final ApplicationId appID; final String user = MockJobs.newUserName(); @@ -156,6 +159,20 @@ public long getStartTime() { public ClusterInfo getClusterInfo() { return null; } + + @Override + public Map getAllJobs(ApplicationId appID) { + // TODO Auto-generated method stub + return null; + } + + @Override + public JobsInfo getPartialJobs(Long offset, Long count, String user, + String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, + JobState jobState) { + // TODO Auto-generated method stub + return null; + } } private Injector injector = Guice.createInjector(new ServletModule() { @@ -171,6 +188,7 @@ protected void configureServlets() { bind(GenericExceptionHandler.class); bind(WebApp.class).toInstance(webApp); bind(AppContext.class).toInstance(appContext); + bind(HistoryContext.class).toInstance(appContext); bind(Configuration.class).toInstance(conf); serve("/*").with(GuiceContainer.class); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobConf.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobConf.java index 57999658edc..d19a6468a0b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobConf.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobConf.java @@ -41,9 +41,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.MockJobs; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.hs.HistoryContext; +import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.ClusterInfo; @@ -90,7 +93,7 @@ public class TestHsWebServicesJobConf extends JerseyTest { private static File testConfDir = new File("target", TestHsWebServicesJobConf.class.getSimpleName() + "confDir"); - static class TestAppContext implements AppContext { + static class TestAppContext implements HistoryContext { final ApplicationAttemptId appAttemptID; final ApplicationId appID; final String user = MockJobs.newUserName(); @@ -156,6 +159,20 @@ public long getStartTime() { public ClusterInfo getClusterInfo() { return null; } + + @Override + public Map getAllJobs(ApplicationId appID) { + // TODO Auto-generated method stub + return null; + } + + @Override + public JobsInfo getPartialJobs(Long offset, Long count, String user, + String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, + JobState jobState) { + // TODO Auto-generated method stub + return null; + } } private Injector injector = Guice.createInjector(new ServletModule() { @@ -195,6 +212,7 @@ protected void configureServlets() { bind(GenericExceptionHandler.class); bind(WebApp.class).toInstance(webApp); bind(AppContext.class).toInstance(appContext); + bind(HistoryContext.class).toInstance(appContext); bind(Configuration.class).toInstance(conf); serve("/*").with(GuiceContainer.class); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java index 3404e71e539..04524062317 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java @@ -38,11 +38,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.MockJobs; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage; +import org.apache.hadoop.mapreduce.v2.hs.HistoryContext; import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs; import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs.JobsPair; +import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.ClusterInfo; @@ -90,7 +94,7 @@ public class TestHsWebServicesJobs extends JerseyTest { private static TestAppContext appContext; private static HsWebApp webApp; - static class TestAppContext implements AppContext { + static class TestAppContext implements HistoryContext { final ApplicationAttemptId appAttemptID; final ApplicationId appID; final String user = MockJobs.newUserName(); @@ -169,6 +173,20 @@ public long getStartTime() { public ClusterInfo getClusterInfo() { return null; } + + @Override + public Map getAllJobs(ApplicationId appID) { + // TODO Auto-generated method stub + return null; + } + + @Override + public JobsInfo getPartialJobs(Long offset, Long count, String user, + String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, + JobState jobState) { + return CachedHistoryStorage.getPartialJobs(this.partialJobs.values(), + offset, count, user, queue, sBegin, sEnd, fBegin, fEnd, jobState); + } } private Injector injector = Guice.createInjector(new ServletModule() { @@ -184,6 +202,7 @@ protected void configureServlets() { bind(GenericExceptionHandler.class); bind(WebApp.class).toInstance(webApp); bind(AppContext.class).toInstance(appContext); + bind(HistoryContext.class).toInstance(appContext); bind(Configuration.class).toInstance(conf); serve("/*").with(GuiceContainer.class); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java index c0110dcd087..5d5da9d551d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java @@ -36,8 +36,11 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.MockJobs; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage; +import org.apache.hadoop.mapreduce.v2.hs.HistoryContext; import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs; import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs.JobsPair; +import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.ClusterInfo; @@ -77,7 +80,7 @@ public class TestHsWebServicesJobsQuery extends JerseyTest { private static TestAppContext appContext; private static HsWebApp webApp; - static class TestAppContext implements AppContext { + static class TestAppContext implements HistoryContext { final String user = MockJobs.newUserName(); final Map fullJobs; final Map partialJobs; @@ -152,6 +155,20 @@ public long getStartTime() { public ClusterInfo getClusterInfo() { return null; } + + @Override + public Map getAllJobs(ApplicationId appID) { + // TODO Auto-generated method stub + return null; + } + + @Override + public JobsInfo getPartialJobs(Long offset, Long count, String user, + String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, + JobState jobState) { + return CachedHistoryStorage.getPartialJobs(this.partialJobs.values(), + offset, count, user, queue, sBegin, sEnd, fBegin, fEnd, jobState); + } } private Injector injector = Guice.createInjector(new ServletModule() { @@ -167,6 +184,7 @@ protected void configureServlets() { bind(GenericExceptionHandler.class); bind(WebApp.class).toInstance(webApp); bind(AppContext.class).toInstance(appContext); + bind(HistoryContext.class).toInstance(appContext); bind(Configuration.class).toInstance(conf); serve("/*").with(GuiceContainer.class); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java index 471acb5b289..c8cdb7c22e3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java @@ -34,12 +34,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.MockJobs; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.hs.HistoryContext; +import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.ClusterInfo; @@ -85,7 +88,7 @@ public class TestHsWebServicesTasks extends JerseyTest { private static TestAppContext appContext; private static HsWebApp webApp; - static class TestAppContext implements AppContext { + static class TestAppContext implements HistoryContext { final ApplicationAttemptId appAttemptID; final ApplicationId appID; final String user = MockJobs.newUserName(); @@ -152,6 +155,20 @@ public long getStartTime() { public ClusterInfo getClusterInfo() { return null; } + + @Override + public Map getAllJobs(ApplicationId appID) { + // TODO Auto-generated method stub + return null; + } + + @Override + public JobsInfo getPartialJobs(Long offset, Long count, String user, + String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, + JobState jobState) { + // TODO Auto-generated method stub + return null; + } } private Injector injector = Guice.createInjector(new ServletModule() { @@ -167,6 +184,7 @@ protected void configureServlets() { bind(GenericExceptionHandler.class); bind(WebApp.class).toInstance(webApp); bind(AppContext.class).toInstance(appContext); + bind(HistoryContext.class).toInstance(appContext); bind(Configuration.class).toInstance(conf); serve("/*").with(GuiceContainer.class);