diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index ae855cad33f..ab9f1fb5b92 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -13,6 +13,9 @@ Release 2.7.3 - UNRELEASED
MAPREDUCE-6637. Testcase Failure : TestFileInputFormat.testSplitLocationInfo.
(Brahma Reddy Battula via wang)
+ MAPREDUCE-6622. Add capability to set JHS job cache to a
+ task-based limit (rchiang via rkanter)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
index f7cba9f8069..7deb0771574 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
@@ -98,7 +98,7 @@ public class JHAdminConfig {
public static final String MR_HISTORY_JOBLIST_CACHE_SIZE =
MR_HISTORY_PREFIX + "joblist.cache.size";
public static final int DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE = 20000;
-
+
/** The location of the Kerberos keytab file.*/
public static final String MR_HISTORY_KEYTAB = MR_HISTORY_PREFIX + "keytab";
@@ -106,7 +106,11 @@ public class JHAdminConfig {
public static final String MR_HISTORY_LOADED_JOB_CACHE_SIZE =
MR_HISTORY_PREFIX + "loadedjobs.cache.size";
public static final int DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE = 5;
-
+
+ /** Size of the loaded job cache (in tasks).*/
+ public static final String MR_HISTORY_LOADED_TASKS_CACHE_SIZE =
+ MR_HISTORY_PREFIX + "loadedtasks.cache.size";
+
/**
* The maximum age of a job history file before it is deleted from the history
* server.
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 9effd764b15..dc6155a0b35 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -2034,7 +2034,32 @@
mapreduce.jobhistory.loadedjobs.cache.size
5
- Size of the loaded job cache
+ Size of the loaded job cache. This property is ignored if
+ the property mapreduce.jobhistory.loadedtasks.cache.size is set to a
+ positive value.
+
+
+
+
+ mapreduce.jobhistory.loadedtasks.cache.size
+
+ Change the job history cache limit to be set in terms
+ of total task count. If the total number of tasks loaded exceeds
+ this value, then the job cache will be shrunk down until it is
+ under this limit (minimum 1 job in cache). If this value is empty
+ or nonpositive then the cache reverts to using the property
+ mapreduce.jobhistory.loadedjobs.cache.size as a job cache size.
+
+ Two recommendations for the mapreduce.jobhistory.loadedtasks.cache.size
+ property:
+ 1) For every 100k of cache size, set the heap size of the Job History
+ Server to 1.2GB. For example,
+ mapreduce.jobhistory.loadedtasks.cache.size=500000, heap size=6GB.
+ 2) Make sure that the cache size is larger than the number of tasks
+ required for the largest job run on the cluster. It might be a good
+ idea to set the value slightly higher (say, 20%) in order to allow
+ for job size growth.
+
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
index 8c9abc3b0e1..c59d17f7192 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
@@ -20,12 +20,16 @@ package org.apache.hadoop.mapreduce.v2.hs;
import java.io.IOException;
import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.Weigher;
+import com.google.common.util.concurrent.UncheckedExecutionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -49,9 +53,10 @@ public class CachedHistoryStorage extends AbstractService implements
HistoryStorage {
private static final Log LOG = LogFactory.getLog(CachedHistoryStorage.class);
- private Map loadedJobCache = null;
- // The number of loaded jobs.
+ private LoadingCache loadedJobCache = null;
private int loadedJobCacheSize;
+ private int loadedTasksCacheSize;
+ private boolean useLoadedTasksCache;
private HistoryFileManager hsManager;
@@ -70,17 +75,70 @@ public class CachedHistoryStorage extends AbstractService implements
@SuppressWarnings("serial")
private void createLoadedJobCache(Configuration conf) {
+ // Set property for old "loaded jobs" cache
loadedJobCacheSize = conf.getInt(
JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
JHAdminConfig.DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE);
- loadedJobCache = Collections.synchronizedMap(new LinkedHashMap(
- loadedJobCacheSize + 1, 0.75f, true) {
- @Override
- public boolean removeEldestEntry(final Map.Entry eldest) {
- return super.size() > loadedJobCacheSize;
+ // Check property for new "loaded tasks" cache perform sanity checking
+ useLoadedTasksCache = false;
+ try {
+ String taskSizeString = conf
+ .get(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE);
+ if (taskSizeString != null) {
+ loadedTasksCacheSize = Math.max(Integer.parseInt(taskSizeString), 1);
+ useLoadedTasksCache = true;
}
- });
+ } catch (NumberFormatException nfe) {
+ LOG.error("The property " +
+ JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE +
+ " is not an integer value. Please set it to a positive" +
+ " integer value.");
+ }
+
+ CacheLoader loader;
+ loader = new CacheLoader() {
+ @Override
+ public Job load(JobId key) throws Exception {
+ return loadJob(key);
+ }
+ };
+
+ if (!useLoadedTasksCache) {
+ loadedJobCache = CacheBuilder.newBuilder()
+ .maximumSize(loadedJobCacheSize)
+ .initialCapacity(loadedJobCacheSize)
+ .concurrencyLevel(1)
+ .build(loader);
+ } else {
+ Weigher weightByTasks;
+ weightByTasks = new Weigher() {
+ /**
+ * Method for calculating Job weight by total task count. If
+ * the total task count is greater than the size of the tasks
+ * cache, then cap it at the cache size. This allows the cache
+ * to always hold one large job.
+ * @param key JobId object
+ * @param value Job object
+ * @return Weight of the job as calculated by total task count
+ */
+ @Override
+ public int weigh(JobId key, Job value) {
+ int taskCount = Math.min(loadedTasksCacheSize,
+ value.getTotalMaps() + value.getTotalReduces());
+ return taskCount;
+ }
+ };
+ // Keep concurrencyLevel at 1. Otherwise, two problems:
+ // 1) The largest job that can be initially loaded is
+ // cache size / 4.
+ // 2) Unit tests are not deterministic.
+ loadedJobCache = CacheBuilder.newBuilder()
+ .maximumWeight(loadedTasksCacheSize)
+ .weigher(weightByTasks)
+ .concurrencyLevel(1)
+ .build(loader);
+ }
}
public void refreshLoadedJobCache() {
@@ -100,52 +158,48 @@ public class CachedHistoryStorage extends AbstractService implements
public CachedHistoryStorage() {
super(CachedHistoryStorage.class.getName());
}
+
+ private static class HSFileRuntimeException extends RuntimeException {
+ public HSFileRuntimeException(String message) {
+ super(message);
+ }
+ }
- private Job loadJob(HistoryFileInfo fileInfo) {
- try {
- Job job = fileInfo.loadJob();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding " + job.getID() + " to loaded job cache");
- }
- // We can clobber results here, but that should be OK, because it only
- // means that we may have two identical copies of the same job floating
- // around for a while.
- loadedJobCache.put(job.getID(), job);
- return job;
- } catch (IOException e) {
- throw new YarnRuntimeException(
- "Could not find/load job: " + fileInfo.getJobId(), e);
+ private Job loadJob(JobId jobId) throws RuntimeException, IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Looking for Job " + jobId);
+ }
+ HistoryFileInfo fileInfo;
+
+ fileInfo = hsManager.getFileInfo(jobId);
+ if (fileInfo == null) {
+ throw new HSFileRuntimeException("Unable to find job " + jobId);
+ } else if (fileInfo.isDeleted()) {
+ throw new HSFileRuntimeException("Cannot load deleted job " + jobId);
+ } else {
+ return fileInfo.loadJob();
}
}
@VisibleForTesting
- Map getLoadedJobCache() {
+ Cache getLoadedJobCache() {
return loadedJobCache;
}
@Override
public Job getFullJob(JobId jobId) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Looking for Job " + jobId);
- }
+ Job retVal = null;
try {
- HistoryFileInfo fileInfo = hsManager.getFileInfo(jobId);
- Job result = null;
- if (fileInfo != null) {
- result = loadedJobCache.get(jobId);
- if (result == null) {
- result = loadJob(fileInfo);
- } else if(fileInfo.isDeleted()) {
- loadedJobCache.remove(jobId);
- result = null;
- }
+ retVal = loadedJobCache.getUnchecked(jobId);
+ } catch (UncheckedExecutionException e) {
+ if (e.getCause() instanceof HSFileRuntimeException) {
+ LOG.error(e.getCause().getMessage());
+ return null;
} else {
- loadedJobCache.remove(jobId);
+ throw new YarnRuntimeException(e.getCause());
}
- return result;
- } catch (IOException e) {
- throw new YarnRuntimeException(e);
}
+ return retVal;
}
@Override
@@ -243,4 +297,14 @@ public class CachedHistoryStorage extends AbstractService implements
}
return allJobs;
}
+
+ @VisibleForTesting
+ public boolean getUseLoadedTasksCache() {
+ return useLoadedTasksCache;
+ }
+
+ @VisibleForTesting
+ public int getLoadedTasksCacheSize() {
+ return loadedTasksCacheSize;
+ }
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java
index de0de7dfdac..936c77221bb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
+import com.google.common.cache.Cache;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
@@ -35,16 +36,27 @@ import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobListCache;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.After;
import org.junit.Test;
import org.mockito.Mockito;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.*;
-import org.apache.hadoop.mapreduce.v2.app.job.Job;
-
+import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
public class TestJobHistory {
@@ -58,13 +70,15 @@ public class TestJobHistory {
Configuration conf = new Configuration();
// Set the cache size to 2
- conf.set(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, "2");
+ conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, 2);
jobHistory.init(conf);
jobHistory.start();
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
.getHistoryStorage());
+ assertFalse(storage.getUseLoadedTasksCache());
+
Job[] jobs = new Job[3];
JobId[] jobIds = new JobId[3];
@@ -84,14 +98,13 @@ public class TestJobHistory {
storage.getFullJob(jobs[i].getID());
}
- Map jobCache = storage.getLoadedJobCache();
- // job0 should have been purged since cache size is 2
- assertFalse(jobCache.containsKey(jobs[0].getID()));
- assertTrue(jobCache.containsKey(jobs[1].getID())
- && jobCache.containsKey(jobs[2].getID()));
+ Cache jobCache = storage.getLoadedJobCache();
+ // Verify some jobs are stored in the cache. Hard to predict eviction
+ // in Guava version.
+ assertTrue(jobCache.size() > 0);
// Setting cache size to 3
- conf.set(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, "3");
+ conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, 3);
doReturn(conf).when(storage).createConf();
when(fileInfo.loadJob()).thenReturn(jobs[0]).thenReturn(jobs[1])
@@ -105,9 +118,223 @@ public class TestJobHistory {
jobCache = storage.getLoadedJobCache();
- // All three jobs should be in cache since its size is now 3
- for (int i = 0; i < 3; i++) {
- assertTrue(jobCache.containsKey(jobs[i].getID()));
+ // Verify some jobs are stored in the cache. Hard to predict eviction
+ // in Guava version.
+ assertTrue(jobCache.size() > 0);
+ }
+
+ @Test
+ public void testTasksCacheLimit() throws Exception {
+ HistoryFileManager historyManager = mock(HistoryFileManager.class);
+ jobHistory = spy(new JobHistory());
+ doReturn(historyManager).when(jobHistory).createHistoryFileManager();
+
+ Configuration conf = new Configuration();
+ // Set the cache threshold to 50 tasks
+ conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 50);
+ jobHistory.init(conf);
+ jobHistory.start();
+
+ CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
+ .getHistoryStorage());
+
+ assertTrue(storage.getUseLoadedTasksCache());
+ assertEquals(storage.getLoadedTasksCacheSize(), 50);
+
+ // Create a bunch of smaller jobs (<< 50 tasks)
+ Job[] jobs = new Job[10];
+ JobId[] jobIds = new JobId[10];
+ for (int i = 0; i < jobs.length; i++) {
+ jobs[i] = mock(Job.class);
+ jobIds[i] = mock(JobId.class);
+ when(jobs[i].getID()).thenReturn(jobIds[i]);
+ when(jobs[i].getTotalMaps()).thenReturn(10);
+ when(jobs[i].getTotalReduces()).thenReturn(2);
+ }
+
+ // Create some large jobs that forces task-based cache flushing
+ Job[] lgJobs = new Job[3];
+ JobId[] lgJobIds = new JobId[3];
+ for (int i = 0; i < lgJobs.length; i++) {
+ lgJobs[i] = mock(Job.class);
+ lgJobIds[i] = mock(JobId.class);
+ when(lgJobs[i].getID()).thenReturn(lgJobIds[i]);
+ when(lgJobs[i].getTotalMaps()).thenReturn(2000);
+ when(lgJobs[i].getTotalReduces()).thenReturn(10);
+ }
+
+ HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
+ when(historyManager.getFileInfo(any(JobId.class))).thenReturn(fileInfo);
+ when(fileInfo.loadJob()).thenReturn(jobs[0]).thenReturn(jobs[1])
+ .thenReturn(jobs[2]).thenReturn(jobs[3]).thenReturn(jobs[4])
+ .thenReturn(jobs[5]).thenReturn(jobs[6]).thenReturn(jobs[7])
+ .thenReturn(jobs[8]).thenReturn(jobs[9]).thenReturn(lgJobs[0])
+ .thenReturn(lgJobs[1]).thenReturn(lgJobs[2]);
+
+ // getFullJob will put the job in the cache if it isn't there
+ Cache jobCache = storage.getLoadedJobCache();
+ for (int i = 0; i < jobs.length; i++) {
+ storage.getFullJob(jobs[i].getID());
+ }
+ long prevSize = jobCache.size();
+
+ // Fill the cache with some larger jobs and verify the cache
+ // gets reduced in size.
+ for (int i = 0; i < lgJobs.length; i++) {
+ storage.getFullJob(lgJobs[i].getID());
+ }
+ assertTrue(jobCache.size() < prevSize);
+ }
+
+ @Test
+ public void testJobCacheLimitLargerThanMax() throws Exception {
+ HistoryFileManager historyManager = mock(HistoryFileManager.class);
+ JobHistory jobHistory = spy(new JobHistory());
+ doReturn(historyManager).when(jobHistory).createHistoryFileManager();
+
+ Configuration conf = new Configuration();
+ // Set the cache threshold to 50 tasks
+ conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 500);
+ jobHistory.init(conf);
+ jobHistory.start();
+
+ CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
+ .getHistoryStorage());
+
+ assertTrue(storage.getUseLoadedTasksCache());
+ assertEquals(storage.getLoadedTasksCacheSize(), 500);
+
+ // Create a bunch of large jobs (>> 50 tasks)
+ Job[] lgJobs = new Job[10];
+ JobId[] lgJobIds = new JobId[10];
+ for (int i = 0; i < lgJobs.length; i++) {
+ lgJobs[i] = mock(Job.class);
+ lgJobIds[i] = mock(JobId.class);
+ when(lgJobs[i].getID()).thenReturn(lgJobIds[i]);
+ when(lgJobs[i].getTotalMaps()).thenReturn(700);
+ when(lgJobs[i].getTotalReduces()).thenReturn(50);
+ }
+
+ HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
+ when(historyManager.getFileInfo(any(JobId.class))).thenReturn(fileInfo);
+ when(fileInfo.loadJob()).thenReturn(lgJobs[0]).thenReturn(lgJobs[1])
+ .thenReturn(lgJobs[2]).thenReturn(lgJobs[3]).thenReturn(lgJobs[4])
+ .thenReturn(lgJobs[5]).thenReturn(lgJobs[6]).thenReturn(lgJobs[7])
+ .thenReturn(lgJobs[8]).thenReturn(lgJobs[9]);
+
+ // getFullJob will put the job in the cache if it isn't there
+ Cache jobCache = storage.getLoadedJobCache();
+ long[] cacheSize = new long[10];
+ for (int i = 0; i < lgJobs.length; i++) {
+ storage.getFullJob(lgJobs[i].getID());
+ assertTrue(jobCache.size() > 0);
+ }
+ }
+
+ @Test
+ public void testLoadedTasksEmptyConfiguration() {
+ Configuration conf = new Configuration();
+ conf.set(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, "");
+
+ HistoryFileManager historyManager = mock(HistoryFileManager.class);
+ JobHistory jobHistory = spy(new JobHistory());
+ doReturn(historyManager).when(jobHistory).createHistoryFileManager();
+ jobHistory.init(conf);
+ jobHistory.start();
+
+ CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
+ .getHistoryStorage());
+
+ assertFalse(storage.getUseLoadedTasksCache());
+ }
+
+ @Test
+ public void testLoadedTasksZeroConfiguration() {
+ Configuration conf = new Configuration();
+ conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 0);
+
+ HistoryFileManager historyManager = mock(HistoryFileManager.class);
+ JobHistory jobHistory = spy(new JobHistory());
+ doReturn(historyManager).when(jobHistory).createHistoryFileManager();
+ jobHistory.init(conf);
+ jobHistory.start();
+
+ CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
+ .getHistoryStorage());
+
+ assertTrue(storage.getUseLoadedTasksCache());
+ assertEquals(storage.getLoadedTasksCacheSize(), 1);
+ }
+
+ @Test
+ public void testLoadedTasksNegativeConfiguration() {
+ Configuration conf = new Configuration();
+ conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, -1);
+
+ HistoryFileManager historyManager = mock(HistoryFileManager.class);
+ JobHistory jobHistory = spy(new JobHistory());
+ doReturn(historyManager).when(jobHistory).createHistoryFileManager();
+ jobHistory.init(conf);
+ jobHistory.start();
+
+ CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
+ .getHistoryStorage());
+
+ assertTrue(storage.getUseLoadedTasksCache());
+ assertEquals(storage.getLoadedTasksCacheSize(), 1);
+ }
+
+ @Test
+ public void testLoadJobErrorCases() throws IOException {
+ HistoryFileManager historyManager = mock(HistoryFileManager.class);
+ jobHistory = spy(new JobHistory());
+ doReturn(historyManager).when(jobHistory).createHistoryFileManager();
+
+ Configuration conf = new Configuration();
+ // Set the cache threshold to 50 tasks
+ conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 50);
+ jobHistory.init(conf);
+ jobHistory.start();
+
+ CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
+ .getHistoryStorage());
+
+ assertTrue(storage.getUseLoadedTasksCache());
+ assertEquals(storage.getLoadedTasksCacheSize(), 50);
+
+ // Create jobs for bad fileInfo results
+ Job[] jobs = new Job[4];
+ JobId[] jobIds = new JobId[4];
+ for (int i = 0; i < jobs.length; i++) {
+ jobs[i] = mock(Job.class);
+ jobIds[i] = mock(JobId.class);
+ when(jobs[i].getID()).thenReturn(jobIds[i]);
+ when(jobs[i].getTotalMaps()).thenReturn(10);
+ when(jobs[i].getTotalReduces()).thenReturn(2);
+ }
+
+ HistoryFileInfo loadJobException = mock(HistoryFileInfo.class);
+ when(loadJobException.loadJob()).thenThrow(new IOException("History file not found"));
+ when(historyManager.getFileInfo(jobIds[0])).thenThrow(new IOException(""));
+ when(historyManager.getFileInfo(jobIds[1])).thenReturn(null);
+ when(historyManager.getFileInfo(jobIds[2])).thenReturn(loadJobException);
+
+ try {
+ storage.getFullJob(jobIds[0]);
+ fail("Did not get expected YarnRuntimeException for getFileInfo() throwing IOException");
+ } catch (YarnRuntimeException e) {
+ // Expected
+ }
+
+ // fileInfo==null should return null
+ Job job = storage.getFullJob(jobIds[1]);
+ assertNull(job);
+
+ try {
+ storage.getFullJob(jobIds[2]);
+ fail("Did not get expected YarnRuntimeException for fileInfo.loadJob() throwing IOException");
+ } catch (YarnRuntimeException e) {
+ // Expected
}
}