Merge MAPREDUCE-3972 from trunk. Fix locking and exception issues in JobHistory server. (Contributed by Robert Joseph Evans)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1327355 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2012-04-18 02:01:02 +00:00
parent 055e2cdd22
commit be7e487ef3
17 changed files with 630 additions and 547 deletions

View File

@ -167,6 +167,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4134. Remove references of mapred.child.ulimit etc. since they MAPREDUCE-4134. Remove references of mapred.child.ulimit etc. since they
are not being used any more (Ravi Prakash via bobby) are not being used any more (Ravi Prakash via bobby)
MAPREDUCE-3972. Fix locking and exception issues in JobHistory server.
(Robert Joseph Evans via sseth)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.mapreduce.v2.app.job; package org.apache.hadoop.mapreduce.v2.app.job;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobACL;
@ -71,6 +73,13 @@ public interface Job {
*/ */
Path getConfFile(); Path getConfFile();
/**
* @return a parsed version of the config files pointed to by
* {@link #getConfFile()}.
* @throws IOException on any error trying to load the conf file.
*/
Configuration loadConfFile() throws IOException;
/** /**
* @return the ACLs for this job for each type of JobACL given. * @return the ACLs for this job for each type of JobACL given.
*/ */

View File

@ -37,6 +37,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -1472,4 +1473,13 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
job.finished(JobState.ERROR); job.finished(JobState.ERROR);
} }
} }
@Override
public Configuration loadConfFile() throws IOException {
Path confPath = getConfFile();
FileContext fc = FileContext.getFileContext(confPath.toUri(), conf);
Configuration jobConf = new Configuration(false);
jobConf.addResource(fc.open(confPath));
return jobConf;
}
} }

View File

@ -31,7 +31,6 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.Response.Status;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -68,14 +67,11 @@ import com.google.inject.Inject;
public class AMWebServices { public class AMWebServices {
private final AppContext appCtx; private final AppContext appCtx;
private final App app; private final App app;
private final Configuration conf;
@Inject @Inject
public AMWebServices(final App app, final AppContext context, public AMWebServices(final App app, final AppContext context) {
final Configuration conf) {
this.appCtx = context; this.appCtx = context;
this.app = app; this.app = app;
this.conf = conf;
} }
Boolean hasAccess(Job job, HttpServletRequest request) { Boolean hasAccess(Job job, HttpServletRequest request) {
@ -272,7 +268,7 @@ public class AMWebServices {
checkAccess(job, hsr); checkAccess(job, hsr);
ConfInfo info; ConfInfo info;
try { try {
info = new ConfInfo(job, this.conf); info = new ConfInfo(job);
} catch (IOException e) { } catch (IOException e) {
throw new NotFoundException("unable to load configuration for job: " throw new NotFoundException("unable to load configuration for job: "
+ jid); + jid);

View File

@ -23,7 +23,6 @@ import static org.apache.hadoop.yarn.webapp.view.JQueryUI._TH;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
@ -44,11 +43,9 @@ import com.google.inject.Inject;
*/ */
public class ConfBlock extends HtmlBlock { public class ConfBlock extends HtmlBlock {
final AppContext appContext; final AppContext appContext;
final Configuration conf;
@Inject ConfBlock(AppContext appctx, Configuration conf) { @Inject ConfBlock(AppContext appctx) {
appContext = appctx; appContext = appctx;
this.conf = conf;
} }
/* /*
@ -71,7 +68,7 @@ public class ConfBlock extends HtmlBlock {
} }
Path confPath = job.getConfFile(); Path confPath = job.getConfFile();
try { try {
ConfInfo info = new ConfInfo(job, this.conf); ConfInfo info = new ConfInfo(job);
html.div().h3(confPath.toString())._(); html.div().h3(confPath.toString())._();
TBODY<TABLE<Hamlet>> tbody = html. TBODY<TABLE<Hamlet>> tbody = html.

View File

@ -40,15 +40,11 @@ public class ConfInfo {
public ConfInfo() { public ConfInfo() {
} }
public ConfInfo(Job job, Configuration conf) throws IOException { public ConfInfo(Job job) throws IOException {
Path confPath = job.getConfFile();
this.property = new ArrayList<ConfEntryInfo>(); this.property = new ArrayList<ConfEntryInfo>();
// Read in the configuration file and put it in a key/value table. Configuration jobConf = job.loadConfFile();
FileContext fc = FileContext.getFileContext(confPath.toUri(), conf); this.path = job.getConfFile().toString();
Configuration jobConf = new Configuration(false);
jobConf.addResource(fc.open(confPath));
this.path = confPath.toString();
for (Map.Entry<String, String> entry : jobConf) { for (Map.Entry<String, String> entry : jobConf) {
this.property.add(new ConfEntryInfo(entry.getKey(), entry.getValue())); this.property.add(new ConfEntryInfo(entry.getKey(), entry.getValue()));
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app; package org.apache.hadoop.mapreduce.v2.app;
import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -27,6 +28,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobACLsManager; import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.ShuffleHandler; import org.apache.hadoop.mapred.ShuffleHandler;
@ -442,7 +444,7 @@ public class MockJobs extends MockApps {
final Path configFile = confFile; final Path configFile = confFile;
Map<JobACL, AccessControlList> tmpJobACLs = new HashMap<JobACL, AccessControlList>(); Map<JobACL, AccessControlList> tmpJobACLs = new HashMap<JobACL, AccessControlList>();
Configuration conf = new Configuration(); final Configuration conf = new Configuration();
conf.set(JobACL.VIEW_JOB.getAclName(), "testuser"); conf.set(JobACL.VIEW_JOB.getAclName(), "testuser");
conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true); conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
@ -564,6 +566,14 @@ public class MockJobs extends MockApps {
amInfoList.add(createAMInfo(2)); amInfoList.add(createAMInfo(2));
return amInfoList; return amInfoList;
} }
@Override
public Configuration loadConfFile() throws IOException {
FileContext fc = FileContext.getFileContext(configFile.toUri(), conf);
Configuration jobConf = new Configuration(false);
jobConf.addResource(fc.open(configFile));
return jobConf;
}
}; };
} }

View File

@ -489,6 +489,11 @@ public class TestRuntimeEstimators {
public List<AMInfo> getAMInfos() { public List<AMInfo> getAMInfos() {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");
} }
@Override
public Configuration loadConfFile() {
throw new UnsupportedOperationException();
}
} }
/* /*

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.MetaInfo; import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo; import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
@ -82,32 +82,41 @@ public class CachedHistoryStorage extends AbstractService implements
super(CachedHistoryStorage.class.getName()); super(CachedHistoryStorage.class.getName());
} }
private Job loadJob(MetaInfo metaInfo) { private Job loadJob(HistoryFileInfo fileInfo) {
try { try {
Job job = hsManager.loadJob(metaInfo); Job job = fileInfo.loadJob();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + job.getID() + " to loaded job cache"); LOG.debug("Adding " + job.getID() + " to loaded job cache");
} }
// We can clobber results here, but that should be OK, because it only
// means that we may have two identical copies of the same job floating
// around for a while.
loadedJobCache.put(job.getID(), job); loadedJobCache.put(job.getID(), job);
return job; return job;
} catch (IOException e) { } catch (IOException e) {
throw new YarnException( throw new YarnException(
"Could not find/load job: " + metaInfo.getJobId(), e); "Could not find/load job: " + fileInfo.getJobId(), e);
} }
} }
@Override @Override
public synchronized Job getFullJob(JobId jobId) { public Job getFullJob(JobId jobId) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Looking for Job " + jobId); LOG.debug("Looking for Job " + jobId);
} }
try { try {
Job result = loadedJobCache.get(jobId); HistoryFileInfo fileInfo = hsManager.getFileInfo(jobId);
if (result == null) { Job result = null;
MetaInfo metaInfo = hsManager.getMetaInfo(jobId); if (fileInfo != null) {
if (metaInfo != null) { result = loadedJobCache.get(jobId);
result = loadJob(metaInfo); if (result == null) {
result = loadJob(fileInfo);
} else if(fileInfo.isDeleted()) {
loadedJobCache.remove(jobId);
result = null;
} }
} else {
loadedJobCache.remove(jobId);
} }
return result; return result;
} catch (IOException e) { } catch (IOException e) {
@ -120,24 +129,19 @@ public class CachedHistoryStorage extends AbstractService implements
LOG.debug("Called getAllPartialJobs()"); LOG.debug("Called getAllPartialJobs()");
SortedMap<JobId, Job> result = new TreeMap<JobId, Job>(); SortedMap<JobId, Job> result = new TreeMap<JobId, Job>();
try { try {
for (MetaInfo mi : hsManager.getAllMetaInfo()) { for (HistoryFileInfo mi : hsManager.getAllFileInfo()) {
if (mi != null) { if (mi != null) {
JobId id = mi.getJobId(); JobId id = mi.getJobId();
result.put(id, new PartialJob(mi.getJobIndexInfo(), id)); result.put(id, new PartialJob(mi.getJobIndexInfo(), id));
} }
} }
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Error trying to scan for all MetaInfos", e); LOG.warn("Error trying to scan for all FileInfos", e);
throw new YarnException(e); throw new YarnException(e);
} }
return result; return result;
} }
@Override
public void jobRemovedFromHDFS(JobId jobId) {
loadedJobCache.remove(jobId);
}
@Override @Override
public JobsInfo getPartialJobs(Long offset, Long count, String user, public JobsInfo getPartialJobs(Long offset, Long count, String user,
String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
@ -173,6 +177,7 @@ public class CachedHistoryStorage extends AbstractService implements
if (end < 0) { // due to overflow if (end < 0) { // due to overflow
end = Long.MAX_VALUE; end = Long.MAX_VALUE;
} }
for (Job job : jobs) { for (Job job : jobs) {
if (at > end) { if (at > end) {
break; break;

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -71,7 +72,7 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
private final Configuration conf; private final Configuration conf;
private final JobId jobId; //Can be picked from JobInfo with a conversion. private final JobId jobId; //Can be picked from JobInfo with a conversion.
private final String user; //Can be picked up from JobInfo private final String user; //Can be picked up from JobInfo
private final Path confFile; private final HistoryFileInfo info;
private JobInfo jobInfo; private JobInfo jobInfo;
private JobReport report; private JobReport report;
AtomicBoolean tasksLoaded = new AtomicBoolean(false); AtomicBoolean tasksLoaded = new AtomicBoolean(false);
@ -84,13 +85,14 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
public CompletedJob(Configuration conf, JobId jobId, Path historyFile, public CompletedJob(Configuration conf, JobId jobId, Path historyFile,
boolean loadTasks, String userName, Path confFile, JobACLsManager aclsMgr) boolean loadTasks, String userName, HistoryFileInfo info,
JobACLsManager aclsMgr)
throws IOException { throws IOException {
LOG.info("Loading job: " + jobId + " from file: " + historyFile); LOG.info("Loading job: " + jobId + " from file: " + historyFile);
this.conf = conf; this.conf = conf;
this.jobId = jobId; this.jobId = jobId;
this.user = userName; this.user = userName;
this.confFile = confFile; this.info = info;
this.aclsMgr = aclsMgr; this.aclsMgr = aclsMgr;
loadFullHistoryData(loadTasks, historyFile); loadFullHistoryData(loadTasks, historyFile);
} }
@ -134,7 +136,7 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
report.setUser(jobInfo.getUsername()); report.setUser(jobInfo.getUsername());
report.setMapProgress((float) getCompletedMaps() / getTotalMaps()); report.setMapProgress((float) getCompletedMaps() / getTotalMaps());
report.setReduceProgress((float) getCompletedReduces() / getTotalReduces()); report.setReduceProgress((float) getCompletedReduces() / getTotalReduces());
report.setJobFile(confFile.toString()); report.setJobFile(getConfFile().toString());
String historyUrl = "N/A"; String historyUrl = "N/A";
try { try {
historyUrl = JobHistoryUtils.getHistoryUrl(conf, jobId.getAppId()); historyUrl = JobHistoryUtils.getHistoryUrl(conf, jobId.getAppId());
@ -392,7 +394,16 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
*/ */
@Override @Override
public Path getConfFile() { public Path getConfFile() {
return confFile; return info.getConfFile();
}
/*
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.v2.app.job.Job#loadConfFile()
*/
@Override
public Configuration loadConfFile() throws IOException {
return info.loadConfFile();
} }
@Override @Override

View File

@ -25,12 +25,17 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -57,6 +62,8 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* This class provides a way to interact with history files in a thread safe * This class provides a way to interact with history files in a thread safe
* manor. * manor.
@ -67,33 +74,251 @@ public class HistoryFileManager extends AbstractService {
private static final Log LOG = LogFactory.getLog(HistoryFileManager.class); private static final Log LOG = LogFactory.getLog(HistoryFileManager.class);
private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class); private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
private static enum HistoryInfoState {
IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED
};
private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
.doneSubdirsBeforeSerialTail(); .doneSubdirsBeforeSerialTail();
public static class MetaInfo { /**
* Maps between a serial number (generated based on jobId) and the timestamp
* component(s) to which it belongs. Facilitates jobId based searches. If a
* jobId is not found in this list - it will not be found.
*/
private static class SerialNumberIndex {
private SortedMap<String, Set<String>> cache;
private int maxSize;
public SerialNumberIndex(int maxSize) {
this.cache = new TreeMap<String, Set<String>>();
this.maxSize = maxSize;
}
public synchronized void add(String serialPart, String timestampPart) {
if (!cache.containsKey(serialPart)) {
cache.put(serialPart, new HashSet<String>());
if (cache.size() > maxSize) {
String key = cache.firstKey();
LOG.error("Dropping " + key
+ " from the SerialNumberIndex. We will no "
+ "longer be able to see jobs that are in that serial index for "
+ cache.get(key));
cache.remove(key);
}
}
Set<String> datePartSet = cache.get(serialPart);
datePartSet.add(timestampPart);
}
public synchronized void remove(String serialPart, String timeStampPart) {
if (cache.containsKey(serialPart)) {
Set<String> set = cache.get(serialPart);
set.remove(timeStampPart);
if (set.isEmpty()) {
cache.remove(serialPart);
}
}
}
public synchronized Set<String> get(String serialPart) {
Set<String> found = cache.get(serialPart);
if (found != null) {
return new HashSet<String>(found);
}
return null;
}
}
private static class JobListCache {
private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
private int maxSize;
private long maxAge;
public JobListCache(int maxSize, long maxAge) {
this.maxSize = maxSize;
this.maxAge = maxAge;
this.cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
}
public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
JobId jobId = fileInfo.getJobIndexInfo().getJobId();
if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + jobId + " to job list cache with "
+ fileInfo.getJobIndexInfo());
}
HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo);
if (cache.size() > maxSize) {
//There is a race here, where more then one thread could be trying to
// remove entries. This could result in too many entries being removed
// from the cache. This is considered OK as the size of the cache
// should be rather large, and we would rather have performance over
// keeping the cache size exactly at the maximum.
Iterator<JobId> keys = cache.navigableKeySet().iterator();
long cutoff = System.currentTimeMillis() - maxAge;
while(cache.size() > maxSize && keys.hasNext()) {
JobId key = keys.next();
HistoryFileInfo firstValue = cache.get(key);
if(firstValue != null) {
synchronized(firstValue) {
if (firstValue.isMovePending()) {
if(firstValue.didMoveFail() &&
firstValue.jobIndexInfo.getFinishTime() <= cutoff) {
cache.remove(key);
//Now lets try to delete it
try {
firstValue.delete();
} catch (IOException e) {
LOG.error("Error while trying to delete history files" +
" that could not be moved to done.", e);
}
} else {
LOG.warn("Waiting to remove " + key
+ " from JobListCache because it is not in done yet.");
}
} else {
cache.remove(key);
}
}
}
}
}
return old;
}
public void delete(HistoryFileInfo fileInfo) {
cache.remove(fileInfo.getJobId());
}
public Collection<HistoryFileInfo> values() {
return new ArrayList<HistoryFileInfo>(cache.values());
}
public HistoryFileInfo get(JobId jobId) {
return cache.get(jobId);
}
}
public class HistoryFileInfo {
private Path historyFile; private Path historyFile;
private Path confFile; private Path confFile;
private Path summaryFile; private Path summaryFile;
private JobIndexInfo jobIndexInfo; private JobIndexInfo jobIndexInfo;
private HistoryInfoState state;
public MetaInfo(Path historyFile, Path confFile, Path summaryFile, private HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile,
JobIndexInfo jobIndexInfo) { JobIndexInfo jobIndexInfo, boolean isInDone) {
this.historyFile = historyFile; this.historyFile = historyFile;
this.confFile = confFile; this.confFile = confFile;
this.summaryFile = summaryFile; this.summaryFile = summaryFile;
this.jobIndexInfo = jobIndexInfo; this.jobIndexInfo = jobIndexInfo;
state = isInDone ? HistoryInfoState.IN_DONE
: HistoryInfoState.IN_INTERMEDIATE;
} }
private Path getHistoryFile() { private synchronized boolean isMovePending() {
return state == HistoryInfoState.IN_INTERMEDIATE
|| state == HistoryInfoState.MOVE_FAILED;
}
private synchronized boolean didMoveFail() {
return state == HistoryInfoState.MOVE_FAILED;
}
/**
* @return true if the files backed by this were deleted.
*/
public synchronized boolean isDeleted() {
return state == HistoryInfoState.DELETED;
}
private synchronized void moveToDone() throws IOException {
if (!isMovePending()) {
// It was either deleted or is already in done. Either way do nothing
return;
}
try {
long completeTime = jobIndexInfo.getFinishTime();
if (completeTime == 0) {
completeTime = System.currentTimeMillis();
}
JobId jobId = jobIndexInfo.getJobId();
List<Path> paths = new ArrayList<Path>(2);
if (historyFile == null) {
LOG.info("No file for job-history with " + jobId + " found in cache!");
} else {
paths.add(historyFile);
}
if (confFile == null) {
LOG.info("No file for jobConf with " + jobId + " found in cache!");
} else {
paths.add(confFile);
}
if (summaryFile == null) {
LOG.info("No summary file for job: " + jobId);
} else {
String jobSummaryString = getJobSummary(intermediateDoneDirFc,
summaryFile);
SUMMARY_LOG.info(jobSummaryString);
LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
intermediateDoneDirFc.delete(summaryFile, false);
summaryFile = null;
}
Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
addDirectoryToSerialNumberIndex(targetDir);
makeDoneSubdir(targetDir);
if (historyFile != null) {
Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
.getName()));
if (!toPath.equals(historyFile)) {
moveToDoneNow(historyFile, toPath);
historyFile = toPath;
}
}
if (confFile != null) {
Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
.getName()));
if (!toPath.equals(confFile)) {
moveToDoneNow(confFile, toPath);
confFile = toPath;
}
}
state = HistoryInfoState.IN_DONE;
} catch (Throwable t) {
LOG.error("Error while trying to move a job to done", t);
this.state = HistoryInfoState.MOVE_FAILED;
}
}
/**
* Parse a job from the JobHistoryFile, if the underlying file is not going
* to be deleted.
*
* @return the Job or null if the underlying file was deleted.
* @throws IOException
* if there is an error trying to read the file.
*/
public synchronized Job loadJob() throws IOException {
return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile,
false, jobIndexInfo.getUser(), this, aclsMgr);
}
/**
* Return the history file. This should only be used for testing.
* @return the history file.
*/
synchronized Path getHistoryFile() {
return historyFile; return historyFile;
} }
private Path getConfFile() { private synchronized void delete() throws IOException {
return confFile; state = HistoryInfoState.DELETED;
} doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
private Path getSummaryFile() {
return summaryFile;
} }
public JobIndexInfo getJobIndexInfo() { public JobIndexInfo getJobIndexInfo() {
@ -104,57 +329,35 @@ public class HistoryFileManager extends AbstractService {
return jobIndexInfo.getJobId(); return jobIndexInfo.getJobId();
} }
private void setHistoryFile(Path historyFile) { public synchronized Path getConfFile() {
this.historyFile = historyFile; return confFile;
} }
private void setConfFile(Path confFile) { public synchronized Configuration loadConfFile() throws IOException {
this.confFile = confFile; FileContext fc = FileContext.getFileContext(confFile.toUri(), conf);
} Configuration jobConf = new Configuration(false);
jobConf.addResource(fc.open(confFile));
private void setSummaryFile(Path summaryFile) { return jobConf;
this.summaryFile = summaryFile;
} }
} }
/** private SerialNumberIndex serialNumberIndex = null;
* Maps between a serial number (generated based on jobId) and the timestamp private JobListCache jobListCache = null;
* component(s) to which it belongs. Facilitates jobId based searches. If a
* jobId is not found in this list - it will not be found.
*/
private final SortedMap<String, Set<String>> idToDateString =
new TreeMap<String, Set<String>>();
// The number of entries in idToDateString
private int dateStringCacheSize;
// Maintains minimal details for recent jobs (parsed from history file name).
// Sorted on Job Completion Time.
private final SortedMap<JobId, MetaInfo> jobListCache =
new ConcurrentSkipListMap<JobId, MetaInfo>();
// The number of jobs to maintain in the job list cache.
private int jobListCacheSize;
// Re-use existing MetaInfo objects if they exist for the specific JobId.
// (synchronization on MetaInfo)
// Check for existence of the object when using iterators.
private final SortedMap<JobId, MetaInfo> intermediateListCache =
new ConcurrentSkipListMap<JobId, MetaInfo>();
// Maintains a list of known done subdirectories. // Maintains a list of known done subdirectories.
private final Set<Path> existingDoneSubdirs = new HashSet<Path>(); private final Set<Path> existingDoneSubdirs = Collections
.synchronizedSet(new HashSet<Path>());
/** /**
* Maintains a mapping between intermediate user directories and the last * Maintains a mapping between intermediate user directories and the last
* known modification time. * known modification time.
*/ */
private Map<String, Long> userDirModificationTimeMap = private Map<String, Long> userDirModificationTimeMap = new HashMap<String, Long>();
new HashMap<String, Long>();
private JobACLsManager aclsMgr; private JobACLsManager aclsMgr;
private Configuration conf; private Configuration conf;
// TODO Remove me!!!!
private boolean debugMode; private boolean debugMode;
private String serialNumberFormat; private String serialNumberFormat;
@ -165,6 +368,9 @@ public class HistoryFileManager extends AbstractService {
private FileContext intermediateDoneDirFc; // Intermediate Done Dir private FileContext intermediateDoneDirFc; // Intermediate Done Dir
// FileContext // FileContext
private ThreadPoolExecutor moveToDoneExecutor = null;
private long maxHistoryAge = 0;
public HistoryFileManager() { public HistoryFileManager() {
super(HistoryFileManager.class.getName()); super(HistoryFileManager.class.getName());
} }
@ -211,12 +417,25 @@ public class HistoryFileManager extends AbstractService {
this.aclsMgr = new JobACLsManager(conf); this.aclsMgr = new JobACLsManager(conf);
jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE); JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
jobListCache = new JobListCache(conf.getInt(
JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE),
maxHistoryAge);
dateStringCacheSize = conf.getInt( serialNumberIndex = new SerialNumberIndex(conf.getInt(
JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE, JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE); JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE));
int numMoveThreads = conf.getInt(
JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
"MoveIntermediateToDone Thread #%d").build();
moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads,
1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
super.init(conf); super.init(conf);
} }
@ -249,6 +468,7 @@ public class HistoryFileManager extends AbstractService {
void initExisting() throws IOException { void initExisting() throws IOException {
LOG.info("Initializing Existing Jobs..."); LOG.info("Initializing Existing Jobs...");
List<FileStatus> timestampedDirList = findTimestampedDirectories(); List<FileStatus> timestampedDirList = findTimestampedDirectories();
// Sort first just so insertion is in a consistent order
Collections.sort(timestampedDirList); Collections.sort(timestampedDirList);
for (FileStatus fs : timestampedDirList) { for (FileStatus fs : timestampedDirList) {
// TODO Could verify the correct format for these directories. // TODO Could verify the correct format for these directories.
@ -271,16 +491,7 @@ public class HistoryFileManager extends AbstractService {
+ serialDirPath.toString() + ". Continuing with next"); + serialDirPath.toString() + ". Continuing with next");
return; return;
} }
synchronized (idToDateString) { serialNumberIndex.remove(serialPart, timeStampPart);
// TODO make this thread safe without the synchronize
if (idToDateString.containsKey(serialPart)) {
Set<String> set = idToDateString.get(serialPart);
set.remove(timeStampPart);
if (set.isEmpty()) {
idToDateString.remove(serialPart);
}
}
}
} }
private void addDirectoryToSerialNumberIndex(Path serialDirPath) { private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
@ -299,21 +510,7 @@ public class HistoryFileManager extends AbstractService {
LOG.warn("Could not find serial portion from path: " LOG.warn("Could not find serial portion from path: "
+ serialDirPath.toString() + ". Continuing with next"); + serialDirPath.toString() + ". Continuing with next");
} }
addToSerialNumberIndex(serialPart, timestampPart); serialNumberIndex.add(serialPart, timestampPart);
}
private void addToSerialNumberIndex(String serialPart, String timestampPart) {
synchronized (idToDateString) {
// TODO make this thread safe without the synchronize
if (!idToDateString.containsKey(serialPart)) {
idToDateString.put(serialPart, new HashSet<String>());
if (idToDateString.size() > dateStringCacheSize) {
idToDateString.remove(idToDateString.firstKey());
}
Set<String> datePartSet = idToDateString.get(serialPart);
datePartSet.add(timestampPart);
}
}
} }
private void addDirectoryToJobListCache(Path path) throws IOException { private void addDirectoryToJobListCache(Path path) throws IOException {
@ -332,10 +529,10 @@ public class HistoryFileManager extends AbstractService {
.getIntermediateConfFileName(jobIndexInfo.getJobId()); .getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId()); .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath() HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
.getParent(), confFileName), new Path(fs.getPath().getParent(), .getPath().getParent(), confFileName), new Path(fs.getPath()
summaryFileName), jobIndexInfo); .getParent(), summaryFileName), jobIndexInfo, true);
addToJobListCache(metaInfo); jobListCache.addIfAbsent(fileInfo);
} }
} }
@ -371,25 +568,18 @@ public class HistoryFileManager extends AbstractService {
return fsList; return fsList;
} }
private void addToJobListCache(MetaInfo metaInfo) {
JobId jobId = metaInfo.getJobIndexInfo().getJobId();
if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + jobId + " to job list cache with "
+ metaInfo.getJobIndexInfo());
}
jobListCache.put(jobId, metaInfo);
if (jobListCache.size() > jobListCacheSize) {
jobListCache.remove(jobListCache.firstKey());
}
}
/** /**
* Scans the intermediate directory to find user directories. Scans these for * Scans the intermediate directory to find user directories. Scans these for
* history files if the modification time for the directory has changed. * history files if the modification time for the directory has changed. Once
* it finds history files it starts the process of moving them to the done
* directory.
* *
* @throws IOException * @throws IOException
* if there was a error while scanning
*/ */
private void scanIntermediateDirectory() throws IOException { void scanIntermediateDirectory() throws IOException {
// TODO it would be great to limit how often this happens, except in the
// case where we are looking for a particular job.
List<FileStatus> userDirList = JobHistoryUtils.localGlobber( List<FileStatus> userDirList = JobHistoryUtils.localGlobber(
intermediateDoneDirFc, intermediateDoneDirPath, ""); intermediateDoneDirFc, intermediateDoneDirPath, "");
@ -405,7 +595,12 @@ public class HistoryFileManager extends AbstractService {
} }
} }
if (shouldScan) { if (shouldScan) {
scanIntermediateDirectory(userDir.getPath()); try {
scanIntermediateDirectory(userDir.getPath());
} catch (IOException e) {
LOG.error("Error while trying to scan the directory "
+ userDir.getPath(), e);
}
} }
} }
} }
@ -426,11 +621,33 @@ public class HistoryFileManager extends AbstractService {
.getIntermediateConfFileName(jobIndexInfo.getJobId()); .getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId()); .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath() HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
.getParent(), confFileName), new Path(fs.getPath().getParent(), .getPath().getParent(), confFileName), new Path(fs.getPath()
summaryFileName), jobIndexInfo); .getParent(), summaryFileName), jobIndexInfo, false);
if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo); final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo);
if (old == null || old.didMoveFail()) {
final HistoryFileInfo found = (old == null) ? fileInfo : old;
long cutoff = System.currentTimeMillis() - maxHistoryAge;
if(found.getJobIndexInfo().getFinishTime() <= cutoff) {
try {
found.delete();
} catch (IOException e) {
LOG.warn("Error cleaning up a HistoryFile that is out of date.", e);
}
} else {
moveToDoneExecutor.execute(new Runnable() {
@Override
public void run() {
try {
found.moveToDone();
} catch (IOException e) {
LOG.info("Failed to process fileInfo for job: " +
found.getJobId(), e);
}
}
});
}
} }
} }
} }
@ -442,11 +659,11 @@ public class HistoryFileManager extends AbstractService {
* fileStatus list of Job History Files. * fileStatus list of Job History Files.
* @param jobId * @param jobId
* The JobId to find. * The JobId to find.
* @return A MetaInfo object for the jobId, null if not found. * @return A FileInfo object for the jobId, null if not found.
* @throws IOException * @throws IOException
*/ */
private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId) private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
throws IOException { JobId jobId) throws IOException {
for (FileStatus fs : fileStatusList) { for (FileStatus fs : fileStatusList) {
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
.getName()); .getName());
@ -455,10 +672,10 @@ public class HistoryFileManager extends AbstractService {
.getIntermediateConfFileName(jobIndexInfo.getJobId()); .getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId()); .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath() HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(
.getParent(), confFileName), new Path(fs.getPath().getParent(), fs.getPath().getParent(), confFileName), new Path(fs.getPath()
summaryFileName), jobIndexInfo); .getParent(), summaryFileName), jobIndexInfo, true);
return metaInfo; return fileInfo;
} }
} }
return null; return null;
@ -474,175 +691,51 @@ public class HistoryFileManager extends AbstractService {
* @return * @return
* @throws IOException * @throws IOException
*/ */
private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException { private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId); int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId);
String boxedSerialNumber = String.valueOf(jobSerialNumber); String boxedSerialNumber = String.valueOf(jobSerialNumber);
Set<String> dateStringSet; Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
synchronized (idToDateString) { if (dateStringSet == null) {
Set<String> found = idToDateString.get(boxedSerialNumber); return null;
if (found == null) {
return null;
} else {
dateStringSet = new HashSet<String>(found);
}
} }
for (String timestampPart : dateStringSet) { for (String timestampPart : dateStringSet) {
Path logDir = canonicalHistoryLogPath(jobId, timestampPart); Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir, List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
doneDirFc); doneDirFc);
MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId); HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId);
if (metaInfo != null) { if (fileInfo != null) {
return metaInfo; return fileInfo;
} }
} }
return null; return null;
} }
/** public Collection<HistoryFileInfo> getAllFileInfo() throws IOException {
* Checks for the existence of the job history file in the intermediate
* directory.
*
* @param jobId
* @return
* @throws IOException
*/
private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException {
scanIntermediateDirectory(); scanIntermediateDirectory();
return intermediateListCache.get(jobId); return jobListCache.values();
} }
/** public HistoryFileInfo getFileInfo(JobId jobId) throws IOException {
* Parse a job from the JobHistoryFile, if the underlying file is not going to // FileInfo available in cache.
* be deleted. HistoryFileInfo fileInfo = jobListCache.get(jobId);
* if (fileInfo != null) {
* @param metaInfo return fileInfo;
* the where the JobHistory is stored.
* @return the Job or null if the underlying file was deleted.
* @throws IOException
* if there is an error trying to read the file.
*/
public Job loadJob(MetaInfo metaInfo) throws IOException {
return new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(),
metaInfo.getHistoryFile(), false, metaInfo.getJobIndexInfo().getUser(),
metaInfo.getConfFile(), aclsMgr);
}
public Collection<MetaInfo> getAllMetaInfo() throws IOException {
scanIntermediateDirectory();
ArrayList<MetaInfo> result = new ArrayList<MetaInfo>();
result.addAll(intermediateListCache.values());
result.addAll(jobListCache.values());
return result;
}
Collection<MetaInfo> getIntermediateMetaInfos() throws IOException {
scanIntermediateDirectory();
return intermediateListCache.values();
}
public MetaInfo getMetaInfo(JobId jobId) throws IOException {
// MetaInfo available in cache.
MetaInfo metaInfo = null;
if (jobListCache.containsKey(jobId)) {
metaInfo = jobListCache.get(jobId);
} }
// OK so scan the intermediate to be sure we did not lose it that way
if (metaInfo != null) { scanIntermediateDirectory();
return metaInfo; fileInfo = jobListCache.get(jobId);
} if (fileInfo != null) {
return fileInfo;
// MetaInfo not available. Check intermediate directory for meta info.
metaInfo = scanIntermediateForJob(jobId);
if (metaInfo != null) {
return metaInfo;
} }
// Intermediate directory does not contain job. Search through older ones. // Intermediate directory does not contain job. Search through older ones.
metaInfo = scanOldDirsForJob(jobId); fileInfo = scanOldDirsForJob(jobId);
if (metaInfo != null) { if (fileInfo != null) {
return metaInfo; return fileInfo;
} }
return null; return null;
} }
void moveToDone(MetaInfo metaInfo) throws IOException {
long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
if (completeTime == 0)
completeTime = System.currentTimeMillis();
JobId jobId = metaInfo.getJobIndexInfo().getJobId();
List<Path> paths = new ArrayList<Path>();
Path historyFile = metaInfo.getHistoryFile();
if (historyFile == null) {
LOG.info("No file for job-history with " + jobId + " found in cache!");
} else {
paths.add(historyFile);
}
Path confFile = metaInfo.getConfFile();
if (confFile == null) {
LOG.info("No file for jobConf with " + jobId + " found in cache!");
} else {
paths.add(confFile);
}
// TODO Check all mi getters and setters for the conf path
Path summaryFile = metaInfo.getSummaryFile();
if (summaryFile == null) {
LOG.info("No summary file for job: " + jobId);
} else {
try {
String jobSummaryString = getJobSummary(intermediateDoneDirFc,
summaryFile);
SUMMARY_LOG.info(jobSummaryString);
LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
intermediateDoneDirFc.delete(summaryFile, false);
metaInfo.setSummaryFile(null);
} catch (IOException e) {
LOG.warn("Failed to process summary file: [" + summaryFile + "]");
throw e;
}
}
Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
addDirectoryToSerialNumberIndex(targetDir);
try {
makeDoneSubdir(targetDir);
} catch (IOException e) {
LOG.warn("Failed creating subdirectory: " + targetDir
+ " while attempting to move files for jobId: " + jobId);
throw e;
}
synchronized (metaInfo) {
if (historyFile != null) {
Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
.getName()));
try {
moveToDoneNow(historyFile, toPath);
} catch (IOException e) {
LOG.warn("Failed to move file: " + historyFile + " for jobId: "
+ jobId);
throw e;
}
metaInfo.setHistoryFile(toPath);
}
if (confFile != null) {
Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
.getName()));
try {
moveToDoneNow(confFile, toPath);
} catch (IOException e) {
LOG.warn("Failed to move file: " + historyFile + " for jobId: "
+ jobId);
throw e;
}
metaInfo.setConfFile(toPath);
}
}
addToJobListCache(metaInfo);
intermediateListCache.remove(jobId);
}
private void moveToDoneNow(final Path src, final Path target) private void moveToDoneNow(final Path src, final Path target)
throws IOException { throws IOException {
LOG.info("Moving " + src.toString() + " to " + target.toString()); LOG.info("Moving " + src.toString() + " to " + target.toString());
@ -658,20 +751,9 @@ public class HistoryFileManager extends AbstractService {
} }
private void makeDoneSubdir(Path path) throws IOException { private void makeDoneSubdir(Path path) throws IOException {
boolean existsInExistingCache = false;
synchronized (existingDoneSubdirs) {
if (existingDoneSubdirs.contains(path))
existsInExistingCache = true;
}
try { try {
doneDirFc.getFileStatus(path); doneDirFc.getFileStatus(path);
if (!existsInExistingCache) { existingDoneSubdirs.add(path);
existingDoneSubdirs.add(path);
if (LOG.isDebugEnabled()) {
LOG.debug("JobHistory.maybeMakeSubdirectory -- We believed " + path
+ " already existed, but it didn't.");
}
}
} catch (FileNotFoundException fnfE) { } catch (FileNotFoundException fnfE) {
try { try {
FsPermission fsp = new FsPermission( FsPermission fsp = new FsPermission(
@ -685,11 +767,8 @@ public class HistoryFileManager extends AbstractService {
+ ", " + fsp); + ", " + fsp);
doneDirFc.setPermission(path, fsp); doneDirFc.setPermission(path, fsp);
} }
synchronized (existingDoneSubdirs) { existingDoneSubdirs.add(path);
existingDoneSubdirs.add(path); } catch (FileAlreadyExistsException faeE) { // Nothing to do.
}
} catch (FileAlreadyExistsException faeE) {
// Nothing to do.
} }
} }
} }
@ -713,16 +792,22 @@ public class HistoryFileManager extends AbstractService {
return finishTime; return finishTime;
} }
private void deleteJobFromDone(MetaInfo metaInfo) throws IOException { private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException {
jobListCache.remove(metaInfo.getJobId()); jobListCache.delete(fileInfo);
doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getHistoryFile()), false); fileInfo.delete();
doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getConfFile()), false);
} }
/**
* Clean up older history files.
*
* @throws IOException
* on any error trying to remove the entries.
*/
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
void clean(long cutoff, HistoryStorage storage) throws IOException { void clean() throws IOException {
// TODO this should be replaced by something that knows about the directory // TODO this should be replaced by something that knows about the directory
// structure and will put less of a load on HDFS. // structure and will put less of a load on HDFS.
long cutoff = System.currentTimeMillis() - maxHistoryAge;
boolean halted = false; boolean halted = false;
// TODO Delete YYYY/MM/DD directories. // TODO Delete YYYY/MM/DD directories.
List<FileStatus> serialDirList = findTimestampedDirectories(); List<FileStatus> serialDirList = findTimestampedDirectories();
@ -737,13 +822,17 @@ public class HistoryFileManager extends AbstractService {
long effectiveTimestamp = getEffectiveTimestamp( long effectiveTimestamp = getEffectiveTimestamp(
jobIndexInfo.getFinishTime(), historyFile); jobIndexInfo.getFinishTime(), historyFile);
if (effectiveTimestamp <= cutoff) { if (effectiveTimestamp <= cutoff) {
String confFileName = JobHistoryUtils HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo
.getIntermediateConfFileName(jobIndexInfo.getJobId()); .getJobId());
MetaInfo metaInfo = new MetaInfo(historyFile.getPath(), new Path( if (fileInfo == null) {
historyFile.getPath().getParent(), confFileName), null, String confFileName = JobHistoryUtils
jobIndexInfo); .getIntermediateConfFileName(jobIndexInfo.getJobId());
storage.jobRemovedFromHDFS(metaInfo.getJobId());
deleteJobFromDone(metaInfo); fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path(
historyFile.getPath().getParent(), confFileName), null,
jobIndexInfo, true);
}
deleteJobFromDone(fileInfo);
} else { } else {
halted = true; halted = true;
break; break;
@ -752,9 +841,7 @@ public class HistoryFileManager extends AbstractService {
if (!halted) { if (!halted) {
doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true); doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
removeDirectoryFromSerialNumberIndex(serialDir.getPath()); removeDirectoryFromSerialNumberIndex(serialDir.getPath());
synchronized (existingDoneSubdirs) { existingDoneSubdirs.remove(serialDir.getPath());
existingDoneSubdirs.remove(serialDir.getPath());
}
} else { } else {
break; // Don't scan any more directories. break; // Don't scan any more directories.
} }

View File

@ -28,7 +28,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
/** /**
* Provides an API to query jobs that have finished. * Provides an API to query jobs that have finished.
*
* For those implementing this API be aware that there is no feedback when
* files are removed from HDFS. You may rely on HistoryFileManager to help
* you know when that has happened if you have not made a complete backup of
* the data stored on HDFS.
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Unstable @InterfaceStability.Unstable
@ -71,10 +76,4 @@ public interface HistoryStorage {
* @return the job, or null if it is not found. * @return the job, or null if it is not found.
*/ */
Job getFullJob(JobId jobId); Job getFullJob(JobId jobId);
/**
* Informs the Storage that a job has been removed from HDFS
* @param jobId the ID of the job that was removed.
*/
void jobRemovedFromHDFS(JobId jobId);
} }

View File

@ -21,10 +21,7 @@ package org.apache.hadoop.mapreduce.v2.hs;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -37,7 +34,7 @@ import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.MetaInfo; import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
@ -66,15 +63,9 @@ public class JobHistory extends AbstractService implements HistoryContext {
// Time interval for the move thread. // Time interval for the move thread.
private long moveThreadInterval; private long moveThreadInterval;
// Number of move threads.
private int numMoveThreads;
private Configuration conf; private Configuration conf;
private Thread moveIntermediateToDoneThread = null; private ScheduledThreadPoolExecutor scheduledExecutor = null;
private MoveIntermediateToDoneRunnable moveIntermediateToDoneRunnable = null;
private ScheduledThreadPoolExecutor cleanerScheduledExecutor = null;
private HistoryStorage storage = null; private HistoryStorage storage = null;
private HistoryFileManager hsManager = null; private HistoryFileManager hsManager = null;
@ -91,8 +82,6 @@ public class JobHistory extends AbstractService implements HistoryContext {
moveThreadInterval = conf.getLong( moveThreadInterval = conf.getLong(
JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS, JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS); JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);
numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
hsManager = new HistoryFileManager(); hsManager = new HistoryFileManager();
hsManager.init(conf); hsManager.init(conf);
@ -120,27 +109,22 @@ public class JobHistory extends AbstractService implements HistoryContext {
((Service) storage).start(); ((Service) storage).start();
} }
// Start moveIntermediatToDoneThread scheduledExecutor = new ScheduledThreadPoolExecutor(2,
moveIntermediateToDoneRunnable = new MoveIntermediateToDoneRunnable( new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d")
moveThreadInterval, numMoveThreads); .build());
moveIntermediateToDoneThread = new Thread(moveIntermediateToDoneRunnable);
moveIntermediateToDoneThread.setName("MoveIntermediateToDoneScanner"); scheduledExecutor.scheduleAtFixedRate(new MoveIntermediateToDoneRunnable(),
moveIntermediateToDoneThread.start(); moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS);
// Start historyCleaner // Start historyCleaner
boolean startCleanerService = conf.getBoolean( boolean startCleanerService = conf.getBoolean(
JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true); JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
if (startCleanerService) { if (startCleanerService) {
long maxAgeOfHistoryFiles = conf.getLong(
JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder().setNameFormat("LogCleaner").build());
long runInterval = conf.getLong( long runInterval = conf.getLong(
JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS); JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS);
cleanerScheduledExecutor scheduledExecutor
.scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles), .scheduleAtFixedRate(new HistoryCleaner(),
30 * 1000l, runInterval, TimeUnit.MILLISECONDS); 30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
} }
super.start(); super.start();
@ -149,24 +133,12 @@ public class JobHistory extends AbstractService implements HistoryContext {
@Override @Override
public void stop() { public void stop() {
LOG.info("Stopping JobHistory"); LOG.info("Stopping JobHistory");
if (moveIntermediateToDoneThread != null) { if (scheduledExecutor != null) {
LOG.info("Stopping move thread"); LOG.info("Stopping History Cleaner/Move To Done");
moveIntermediateToDoneRunnable.stop(); scheduledExecutor.shutdown();
moveIntermediateToDoneThread.interrupt();
try {
LOG.info("Joining on move thread");
moveIntermediateToDoneThread.join();
} catch (InterruptedException e) {
LOG.info("Interrupted while stopping move thread");
}
}
if (cleanerScheduledExecutor != null) {
LOG.info("Stopping History Cleaner");
cleanerScheduledExecutor.shutdown();
boolean interrupted = false; boolean interrupted = false;
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
while (!cleanerScheduledExecutor.isShutdown() while (!scheduledExecutor.isShutdown()
&& System.currentTimeMillis() > currentTime + 1000l && !interrupted) { && System.currentTimeMillis() > currentTime + 1000l && !interrupted) {
try { try {
Thread.sleep(20); Thread.sleep(20);
@ -174,8 +146,10 @@ public class JobHistory extends AbstractService implements HistoryContext {
interrupted = true; interrupted = true;
} }
} }
if (!cleanerScheduledExecutor.isShutdown()) { if (!scheduledExecutor.isShutdown()) {
LOG.warn("HistoryCleanerService shutdown may not have succeeded"); LOG.warn("HistoryCleanerService/move to done shutdown may not have " +
"succeeded, Forcing a shutdown");
scheduledExecutor.shutdownNow();
} }
} }
if (storage instanceof Service) { if (storage instanceof Service) {
@ -195,68 +169,34 @@ public class JobHistory extends AbstractService implements HistoryContext {
} }
private class MoveIntermediateToDoneRunnable implements Runnable { private class MoveIntermediateToDoneRunnable implements Runnable {
private long sleepTime;
private ThreadPoolExecutor moveToDoneExecutor = null;
private boolean running = false;
public synchronized void stop() {
running = false;
notify();
}
MoveIntermediateToDoneRunnable(long sleepTime, int numMoveThreads) {
this.sleepTime = sleepTime;
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
"MoveIntermediateToDone Thread #%d").build();
moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
running = true;
}
@Override @Override
public void run() { public void run() {
Thread.currentThread().setName("IntermediateHistoryScanner");
try { try {
while (true) { LOG.info("Starting scan to move intermediate done files");
LOG.info("Starting scan to move intermediate done files"); hsManager.scanIntermediateDirectory();
for (final MetaInfo metaInfo : hsManager.getIntermediateMetaInfos()) {
moveToDoneExecutor.execute(new Runnable() {
@Override
public void run() {
try {
hsManager.moveToDone(metaInfo);
} catch (IOException e) {
LOG.info(
"Failed to process metaInfo for job: "
+ metaInfo.getJobId(), e);
}
}
});
}
synchronized (this) {
try {
this.wait(sleepTime);
} catch (InterruptedException e) {
LOG.info("IntermediateHistoryScannerThread interrupted");
}
if (!running) {
break;
}
}
}
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Unable to get a list of intermediate files to be moved"); LOG.error("Error while scanning intermediate done dir ", e);
// TODO Shut down the entire process!!!!
} }
} }
} }
private class HistoryCleaner implements Runnable {
public void run() {
LOG.info("History Cleaner started");
try {
hsManager.clean();
} catch (IOException e) {
LOG.warn("Error trying to clean up ", e);
}
LOG.info("History Cleaner complete");
}
}
/** /**
* Helper method for test cases. * Helper method for test cases.
*/ */
MetaInfo getJobMetaInfo(JobId jobId) throws IOException { HistoryFileInfo getJobFileInfo(JobId jobId) throws IOException {
return hsManager.getMetaInfo(jobId); return hsManager.getFileInfo(jobId);
} }
@Override @Override
@ -313,25 +253,6 @@ public class JobHistory extends AbstractService implements HistoryContext {
fBegin, fEnd, jobState); fBegin, fEnd, jobState);
} }
public class HistoryCleaner implements Runnable {
long maxAgeMillis;
public HistoryCleaner(long maxAge) {
this.maxAgeMillis = maxAge;
}
public void run() {
LOG.info("History Cleaner started");
long cutoff = System.currentTimeMillis() - maxAgeMillis;
try {
hsManager.clean(cutoff, storage);
} catch (IOException e) {
LOG.warn("Error trying to clean up ", e);
}
LOG.info("History Cleaner complete");
}
}
// TODO AppContext - Not Required // TODO AppContext - Not Required
private ApplicationAttemptId appAttemptID; private ApplicationAttemptId appAttemptID;

View File

@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobACL;
@ -166,6 +167,11 @@ public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
public Path getConfFile() { public Path getConfFile() {
throw new IllegalStateException("Not implemented yet"); throw new IllegalStateException("Not implemented yet");
} }
@Override
public Configuration loadConfFile() {
throw new IllegalStateException("Not implemented yet");
}
@Override @Override
public Map<JobACL, AccessControlList> getJobACLs() { public Map<JobACL, AccessControlList> getJobACLs() {

View File

@ -65,7 +65,6 @@ import com.google.inject.Inject;
public class HsWebServices { public class HsWebServices {
private final HistoryContext ctx; private final HistoryContext ctx;
private WebApp webapp; private WebApp webapp;
private final Configuration conf;
@Context @Context
UriInfo uriInfo; UriInfo uriInfo;
@ -74,7 +73,6 @@ public class HsWebServices {
public HsWebServices(final HistoryContext ctx, final Configuration conf, public HsWebServices(final HistoryContext ctx, final Configuration conf,
final WebApp webapp) { final WebApp webapp) {
this.ctx = ctx; this.ctx = ctx;
this.conf = conf;
this.webapp = webapp; this.webapp = webapp;
} }
@ -222,7 +220,7 @@ public class HsWebServices {
Job job = AMWebServices.getJobFromJobIdString(jid, ctx); Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
ConfInfo info; ConfInfo info;
try { try {
info = new ConfInfo(job, this.conf); info = new ConfInfo(job);
} catch (IOException e) { } catch (IOException e) {
throw new NotFoundException("unable to load configuration for job: " throw new NotFoundException("unable to load configuration for job: "
+ jid); + jid);

View File

@ -22,12 +22,15 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameters;
import static org.mockito.Mockito.*;
@RunWith(value = Parameterized.class) @RunWith(value = Parameterized.class)
public class TestJobHistoryEntities { public class TestJobHistoryEntities {
@ -61,10 +64,12 @@ public class TestJobHistoryEntities {
/* Verify some expected values based on the history file */ /* Verify some expected values based on the history file */
@Test @Test
public void testCompletedJob() throws Exception { public void testCompletedJob() throws Exception {
HistoryFileInfo info = mock(HistoryFileInfo.class);
when(info.getConfFile()).thenReturn(fullConfPath);
//Re-initialize to verify the delayed load. //Re-initialize to verify the delayed load.
completedJob = completedJob =
new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user", new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
fullConfPath, jobAclsManager); info, jobAclsManager);
//Verify tasks loaded based on loadTask parameter. //Verify tasks loaded based on loadTask parameter.
assertEquals(loadTasks, completedJob.tasksLoaded.get()); assertEquals(loadTasks, completedJob.tasksLoaded.get());
assertEquals(1, completedJob.getAMInfos().size()); assertEquals(1, completedJob.getAMInfos().size());
@ -84,9 +89,11 @@ public class TestJobHistoryEntities {
@Test @Test
public void testCompletedTask() throws Exception { public void testCompletedTask() throws Exception {
HistoryFileInfo info = mock(HistoryFileInfo.class);
when(info.getConfFile()).thenReturn(fullConfPath);
completedJob = completedJob =
new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user", new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
fullConfPath, jobAclsManager); info, jobAclsManager);
TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
@ -111,9 +118,11 @@ public class TestJobHistoryEntities {
@Test @Test
public void testCompletedTaskAttempt() throws Exception { public void testCompletedTaskAttempt() throws Exception {
HistoryFileInfo info = mock(HistoryFileInfo.class);
when(info.getConfFile()).thenReturn(fullConfPath);
completedJob = completedJob =
new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user", new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
fullConfPath, jobAclsManager); info, jobAclsManager);
TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
TaskAttemptId mta1Id = MRBuilderUtils.newTaskAttemptId(mt1Id, 0); TaskAttemptId mta1Id = MRBuilderUtils.newTaskAttemptId(mt1Id, 0);

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory; import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
@ -84,12 +85,22 @@ public class TestJobHistoryParsing {
@Test @Test
public void testHistoryParsing() throws Exception { public void testHistoryParsing() throws Exception {
checkHistoryParsing(2, 1, 2); LOG.info("STARTING testHistoryParsing()");
try {
checkHistoryParsing(2, 1, 2);
} finally {
LOG.info("FINISHED testHistoryParsing()");
}
} }
@Test @Test
public void testHistoryParsingWithParseErrors() throws Exception { public void testHistoryParsingWithParseErrors() throws Exception {
checkHistoryParsing(3, 0, 2); LOG.info("STARTING testHistoryParsingWithParseErrors()");
try {
checkHistoryParsing(3, 0, 2);
} finally {
LOG.info("FINISHED testHistoryParsingWithParseErrors()");
}
} }
private static String getJobSummary(FileContext fc, Path path) throws IOException { private static String getJobSummary(FileContext fc, Path path) throws IOException {
@ -124,61 +135,112 @@ public class TestJobHistoryParsing {
String jobhistoryDir = JobHistoryUtils String jobhistoryDir = JobHistoryUtils
.getHistoryIntermediateDoneDirForUser(conf); .getHistoryIntermediateDoneDirForUser(conf);
JobHistory jobHistory = new JobHistory();
jobHistory.init(conf);
JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId)
.getJobIndexInfo();
String jobhistoryFileName = FileNameIndexUtils
.getDoneFileName(jobIndexInfo);
Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
FSDataInputStream in = null;
LOG.info("JobHistoryFile is: " + historyFilePath);
FileContext fc = null; FileContext fc = null;
try { try {
fc = FileContext.getFileContext(conf); fc = FileContext.getFileContext(conf);
in = fc.open(fc.makeQualified(historyFilePath));
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.info("Can not open history file: " + historyFilePath, ioe); LOG.info("Can not get FileContext", ioe);
throw (new Exception("Can not open History File")); throw (new Exception("Can not get File Context"));
}
if (numMaps == numSuccessfulMaps) {
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobId);
Path summaryFile = new Path(jobhistoryDir, summaryFileName);
String jobSummaryString = getJobSummary(fc, summaryFile);
Assert.assertNotNull(jobSummaryString);
Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
Map<String, String> jobSummaryElements = new HashMap<String, String>();
StringTokenizer strToken = new StringTokenizer(jobSummaryString, ",");
while (strToken.hasMoreTokens()) {
String keypair = strToken.nextToken();
jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]);
}
Assert.assertEquals("JobId does not match", jobId.toString(),
jobSummaryElements.get("jobId"));
Assert.assertEquals("JobName does not match", "test",
jobSummaryElements.get("jobName"));
Assert.assertTrue("submitTime should not be 0",
Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
Assert.assertTrue("launchTime should not be 0",
Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
Assert
.assertTrue(
"firstReduceTaskLaunchTime should not be 0",
Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0);
Assert.assertTrue("finishTime should not be 0",
Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps,
Integer.parseInt(jobSummaryElements.get("numMaps")));
Assert.assertEquals("Mismatch in num reduce slots", numReduces,
Integer.parseInt(jobSummaryElements.get("numReduces")));
Assert.assertEquals("User does not match", System.getProperty("user.name"),
jobSummaryElements.get("user"));
Assert.assertEquals("Queue does not match", "default",
jobSummaryElements.get("queue"));
Assert.assertEquals("Status does not match", "SUCCEEDED",
jobSummaryElements.get("status"));
} }
JobHistoryParser parser = new JobHistoryParser(in); JobHistory jobHistory = new JobHistory();
final EventReader realReader = new EventReader(in); jobHistory.init(conf);
EventReader reader = Mockito.mock(EventReader.class); HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
if (numMaps == numSuccessfulMaps) { JobInfo jobInfo;
reader = realReader; long numFinishedMaps;
} else {
final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack! synchronized(fileInfo) {
Mockito.when(reader.getNextEvent()).thenAnswer( Path historyFilePath = fileInfo.getHistoryFile();
new Answer<HistoryEvent>() { FSDataInputStream in = null;
public HistoryEvent answer(InvocationOnMock invocation) LOG.info("JobHistoryFile is: " + historyFilePath);
throws IOException { try {
HistoryEvent event = realReader.getNextEvent(); in = fc.open(fc.makeQualified(historyFilePath));
if (event instanceof TaskFinishedEvent) { } catch (IOException ioe) {
numFinishedEvents.incrementAndGet(); LOG.info("Can not open history file: " + historyFilePath, ioe);
} throw (new Exception("Can not open History File"));
}
if (numFinishedEvents.get() <= numSuccessfulMaps) {
return event; JobHistoryParser parser = new JobHistoryParser(in);
} else { final EventReader realReader = new EventReader(in);
throw new IOException("test"); EventReader reader = Mockito.mock(EventReader.class);
if (numMaps == numSuccessfulMaps) {
reader = realReader;
} else {
final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack!
Mockito.when(reader.getNextEvent()).thenAnswer(
new Answer<HistoryEvent>() {
public HistoryEvent answer(InvocationOnMock invocation)
throws IOException {
HistoryEvent event = realReader.getNextEvent();
if (event instanceof TaskFinishedEvent) {
numFinishedEvents.incrementAndGet();
}
if (numFinishedEvents.get() <= numSuccessfulMaps) {
return event;
} else {
throw new IOException("test");
}
} }
} }
}
); );
} }
JobInfo jobInfo = parser.parse(reader); jobInfo = parser.parse(reader);
long numFinishedMaps = numFinishedMaps =
computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps); computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps);
if (numFinishedMaps != numMaps) { if (numFinishedMaps != numMaps) {
Exception parseException = parser.getParseException(); Exception parseException = parser.getParseException();
Assert.assertNotNull("Didn't get expected parse exception", Assert.assertNotNull("Didn't get expected parse exception",
parseException); parseException);
}
} }
Assert.assertEquals("Incorrect username ", System.getProperty("user.name"), Assert.assertEquals("Incorrect username ", System.getProperty("user.name"),
@ -246,52 +308,6 @@ public class TestJobHistoryParsing {
} }
} }
} }
if (numMaps == numSuccessfulMaps) {
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobId);
Path summaryFile = new Path(jobhistoryDir, summaryFileName);
String jobSummaryString = getJobSummary(fc, summaryFile);
Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
Assert.assertNotNull(jobSummaryString);
Map<String, String> jobSummaryElements = new HashMap<String, String>();
StringTokenizer strToken = new StringTokenizer(jobSummaryString, ",");
while (strToken.hasMoreTokens()) {
String keypair = strToken.nextToken();
jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]);
}
Assert.assertEquals("JobId does not match", jobId.toString(),
jobSummaryElements.get("jobId"));
Assert.assertEquals("JobName does not match", "test",
jobSummaryElements.get("jobName"));
Assert.assertTrue("submitTime should not be 0",
Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
Assert.assertTrue("launchTime should not be 0",
Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
Assert
.assertTrue(
"firstReduceTaskLaunchTime should not be 0",
Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0);
Assert.assertTrue("finishTime should not be 0",
Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps,
Integer.parseInt(jobSummaryElements.get("numMaps")));
Assert.assertEquals("Mismatch in num reduce slots", numReduces,
Integer.parseInt(jobSummaryElements.get("numReduces")));
Assert.assertEquals("User does not match", System.getProperty("user.name"),
jobSummaryElements.get("user"));
Assert.assertEquals("Queue does not match", "default",
jobSummaryElements.get("queue"));
Assert.assertEquals("Status does not match", "SUCCEEDED",
jobSummaryElements.get("status"));
}
} }
// Computes finished maps similar to RecoveryService... // Computes finished maps similar to RecoveryService...
@ -314,6 +330,8 @@ public class TestJobHistoryParsing {
@Test @Test
public void testHistoryParsingForFailedAttempts() throws Exception { public void testHistoryParsingForFailedAttempts() throws Exception {
LOG.info("STARTING testHistoryParsingForFailedAttempts");
try {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf conf
.setClass( .setClass(
@ -335,7 +353,7 @@ public class TestJobHistoryParsing {
JobHistory jobHistory = new JobHistory(); JobHistory jobHistory = new JobHistory();
jobHistory.init(conf); jobHistory.init(conf);
JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId) JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
.getJobIndexInfo(); .getJobIndexInfo();
String jobhistoryFileName = FileNameIndexUtils String jobhistoryFileName = FileNameIndexUtils
.getDoneFileName(jobIndexInfo); .getDoneFileName(jobIndexInfo);
@ -372,6 +390,9 @@ public class TestJobHistoryParsing {
} }
} }
Assert.assertEquals("No of Failed tasks doesn't match.", 2, noOffailedAttempts); Assert.assertEquals("No of Failed tasks doesn't match.", 2, noOffailedAttempts);
} finally {
LOG.info("FINISHED testHistoryParsingForFailedAttempts");
}
} }
static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory { static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {