MAPREDUCE-6622. Add capability to set JHS job cache to a task-based limit (rchiang via rkanter)
(cherry picked from commit 0f72da7e28
)
This commit is contained in:
parent
7b79567cf9
commit
48cb4b9ba7
|
@ -19,6 +19,9 @@ Release 2.9.0 - UNRELEASED
|
||||||
MAPREDUCE-6640. mapred job -history command should be able to take
|
MAPREDUCE-6640. mapred job -history command should be able to take
|
||||||
Job ID (rkanter)
|
Job ID (rkanter)
|
||||||
|
|
||||||
|
MAPREDUCE-6622. Add capability to set JHS job cache to a
|
||||||
|
task-based limit (rchiang via rkanter)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -107,6 +107,10 @@ public class JHAdminConfig {
|
||||||
MR_HISTORY_PREFIX + "loadedjobs.cache.size";
|
MR_HISTORY_PREFIX + "loadedjobs.cache.size";
|
||||||
public static final int DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE = 5;
|
public static final int DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE = 5;
|
||||||
|
|
||||||
|
/** Size of the loaded job cache (in tasks).*/
|
||||||
|
public static final String MR_HISTORY_LOADED_TASKS_CACHE_SIZE =
|
||||||
|
MR_HISTORY_PREFIX + "loadedtasks.cache.size";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The maximum age of a job history file before it is deleted from the history
|
* The maximum age of a job history file before it is deleted from the history
|
||||||
* server.
|
* server.
|
||||||
|
|
|
@ -1653,7 +1653,32 @@
|
||||||
<property>
|
<property>
|
||||||
<name>mapreduce.jobhistory.loadedjobs.cache.size</name>
|
<name>mapreduce.jobhistory.loadedjobs.cache.size</name>
|
||||||
<value>5</value>
|
<value>5</value>
|
||||||
<description>Size of the loaded job cache</description>
|
<description>Size of the loaded job cache. This property is ignored if
|
||||||
|
the property mapreduce.jobhistory.loadedtasks.cache.size is set to a
|
||||||
|
positive value.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.jobhistory.loadedtasks.cache.size</name>
|
||||||
|
<value></value>
|
||||||
|
<description>Change the job history cache limit to be set in terms
|
||||||
|
of total task count. If the total number of tasks loaded exceeds
|
||||||
|
this value, then the job cache will be shrunk down until it is
|
||||||
|
under this limit (minimum 1 job in cache). If this value is empty
|
||||||
|
or nonpositive then the cache reverts to using the property
|
||||||
|
mapreduce.jobhistory.loadedjobs.cache.size as a job cache size.
|
||||||
|
|
||||||
|
Two recommendations for the mapreduce.jobhistory.loadedtasks.cache.size
|
||||||
|
property:
|
||||||
|
1) For every 100k of cache size, set the heap size of the Job History
|
||||||
|
Server to 1.2GB. For example,
|
||||||
|
mapreduce.jobhistory.loadedtasks.cache.size=500000, heap size=6GB.
|
||||||
|
2) Make sure that the cache size is larger than the number of tasks
|
||||||
|
required for the largest job run on the cluster. It might be a good
|
||||||
|
idea to set the value slightly higher (say, 20%) in order to allow
|
||||||
|
for job size growth.
|
||||||
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
|
|
@ -20,12 +20,16 @@ package org.apache.hadoop.mapreduce.v2.hs;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.LinkedHashMap;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.SortedMap;
|
import java.util.SortedMap;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
import com.google.common.cache.Cache;
|
||||||
|
import com.google.common.cache.CacheBuilder;
|
||||||
|
import com.google.common.cache.CacheLoader;
|
||||||
|
import com.google.common.cache.LoadingCache;
|
||||||
|
import com.google.common.cache.Weigher;
|
||||||
|
import com.google.common.util.concurrent.UncheckedExecutionException;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -49,9 +53,10 @@ public class CachedHistoryStorage extends AbstractService implements
|
||||||
HistoryStorage {
|
HistoryStorage {
|
||||||
private static final Log LOG = LogFactory.getLog(CachedHistoryStorage.class);
|
private static final Log LOG = LogFactory.getLog(CachedHistoryStorage.class);
|
||||||
|
|
||||||
private Map<JobId, Job> loadedJobCache = null;
|
private LoadingCache<JobId, Job> loadedJobCache = null;
|
||||||
// The number of loaded jobs.
|
|
||||||
private int loadedJobCacheSize;
|
private int loadedJobCacheSize;
|
||||||
|
private int loadedTasksCacheSize;
|
||||||
|
private boolean useLoadedTasksCache;
|
||||||
|
|
||||||
private HistoryFileManager hsManager;
|
private HistoryFileManager hsManager;
|
||||||
|
|
||||||
|
@ -70,17 +75,70 @@ public class CachedHistoryStorage extends AbstractService implements
|
||||||
|
|
||||||
@SuppressWarnings("serial")
|
@SuppressWarnings("serial")
|
||||||
private void createLoadedJobCache(Configuration conf) {
|
private void createLoadedJobCache(Configuration conf) {
|
||||||
|
// Set property for old "loaded jobs" cache
|
||||||
loadedJobCacheSize = conf.getInt(
|
loadedJobCacheSize = conf.getInt(
|
||||||
JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
|
JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
|
||||||
JHAdminConfig.DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE);
|
JHAdminConfig.DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE);
|
||||||
|
|
||||||
loadedJobCache = Collections.synchronizedMap(new LinkedHashMap<JobId, Job>(
|
// Check property for new "loaded tasks" cache perform sanity checking
|
||||||
loadedJobCacheSize + 1, 0.75f, true) {
|
useLoadedTasksCache = false;
|
||||||
@Override
|
try {
|
||||||
public boolean removeEldestEntry(final Map.Entry<JobId, Job> eldest) {
|
String taskSizeString = conf
|
||||||
return super.size() > loadedJobCacheSize;
|
.get(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE);
|
||||||
|
if (taskSizeString != null) {
|
||||||
|
loadedTasksCacheSize = Math.max(Integer.parseInt(taskSizeString), 1);
|
||||||
|
useLoadedTasksCache = true;
|
||||||
|
}
|
||||||
|
} catch (NumberFormatException nfe) {
|
||||||
|
LOG.error("The property " +
|
||||||
|
JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE +
|
||||||
|
" is not an integer value. Please set it to a positive" +
|
||||||
|
" integer value.");
|
||||||
|
}
|
||||||
|
|
||||||
|
CacheLoader<JobId, Job> loader;
|
||||||
|
loader = new CacheLoader<JobId, Job>() {
|
||||||
|
@Override
|
||||||
|
public Job load(JobId key) throws Exception {
|
||||||
|
return loadJob(key);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if (!useLoadedTasksCache) {
|
||||||
|
loadedJobCache = CacheBuilder.newBuilder()
|
||||||
|
.maximumSize(loadedJobCacheSize)
|
||||||
|
.initialCapacity(loadedJobCacheSize)
|
||||||
|
.concurrencyLevel(1)
|
||||||
|
.build(loader);
|
||||||
|
} else {
|
||||||
|
Weigher<JobId, Job> weightByTasks;
|
||||||
|
weightByTasks = new Weigher<JobId, Job>() {
|
||||||
|
/**
|
||||||
|
* Method for calculating Job weight by total task count. If
|
||||||
|
* the total task count is greater than the size of the tasks
|
||||||
|
* cache, then cap it at the cache size. This allows the cache
|
||||||
|
* to always hold one large job.
|
||||||
|
* @param key JobId object
|
||||||
|
* @param value Job object
|
||||||
|
* @return Weight of the job as calculated by total task count
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int weigh(JobId key, Job value) {
|
||||||
|
int taskCount = Math.min(loadedTasksCacheSize,
|
||||||
|
value.getTotalMaps() + value.getTotalReduces());
|
||||||
|
return taskCount;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// Keep concurrencyLevel at 1. Otherwise, two problems:
|
||||||
|
// 1) The largest job that can be initially loaded is
|
||||||
|
// cache size / 4.
|
||||||
|
// 2) Unit tests are not deterministic.
|
||||||
|
loadedJobCache = CacheBuilder.newBuilder()
|
||||||
|
.maximumWeight(loadedTasksCacheSize)
|
||||||
|
.weigher(weightByTasks)
|
||||||
|
.concurrencyLevel(1)
|
||||||
|
.build(loader);
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void refreshLoadedJobCache() {
|
public void refreshLoadedJobCache() {
|
||||||
|
@ -101,51 +159,47 @@ public class CachedHistoryStorage extends AbstractService implements
|
||||||
super(CachedHistoryStorage.class.getName());
|
super(CachedHistoryStorage.class.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
private Job loadJob(HistoryFileInfo fileInfo) {
|
private static class HSFileRuntimeException extends RuntimeException {
|
||||||
try {
|
public HSFileRuntimeException(String message) {
|
||||||
Job job = fileInfo.loadJob();
|
super(message);
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Adding " + job.getID() + " to loaded job cache");
|
|
||||||
}
|
}
|
||||||
// We can clobber results here, but that should be OK, because it only
|
}
|
||||||
// means that we may have two identical copies of the same job floating
|
|
||||||
// around for a while.
|
private Job loadJob(JobId jobId) throws RuntimeException, IOException {
|
||||||
loadedJobCache.put(job.getID(), job);
|
if (LOG.isDebugEnabled()) {
|
||||||
return job;
|
LOG.debug("Looking for Job " + jobId);
|
||||||
} catch (IOException e) {
|
}
|
||||||
throw new YarnRuntimeException(
|
HistoryFileInfo fileInfo;
|
||||||
"Could not find/load job: " + fileInfo.getJobId(), e);
|
|
||||||
|
fileInfo = hsManager.getFileInfo(jobId);
|
||||||
|
if (fileInfo == null) {
|
||||||
|
throw new HSFileRuntimeException("Unable to find job " + jobId);
|
||||||
|
} else if (fileInfo.isDeleted()) {
|
||||||
|
throw new HSFileRuntimeException("Cannot load deleted job " + jobId);
|
||||||
|
} else {
|
||||||
|
return fileInfo.loadJob();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
Map<JobId, Job> getLoadedJobCache() {
|
Cache<JobId, Job> getLoadedJobCache() {
|
||||||
return loadedJobCache;
|
return loadedJobCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Job getFullJob(JobId jobId) {
|
public Job getFullJob(JobId jobId) {
|
||||||
if (LOG.isDebugEnabled()) {
|
Job retVal = null;
|
||||||
LOG.debug("Looking for Job " + jobId);
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
HistoryFileInfo fileInfo = hsManager.getFileInfo(jobId);
|
retVal = loadedJobCache.getUnchecked(jobId);
|
||||||
Job result = null;
|
} catch (UncheckedExecutionException e) {
|
||||||
if (fileInfo != null) {
|
if (e.getCause() instanceof HSFileRuntimeException) {
|
||||||
result = loadedJobCache.get(jobId);
|
LOG.error(e.getCause().getMessage());
|
||||||
if (result == null) {
|
return null;
|
||||||
result = loadJob(fileInfo);
|
|
||||||
} else if(fileInfo.isDeleted()) {
|
|
||||||
loadedJobCache.remove(jobId);
|
|
||||||
result = null;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
loadedJobCache.remove(jobId);
|
throw new YarnRuntimeException(e.getCause());
|
||||||
}
|
}
|
||||||
return result;
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new YarnRuntimeException(e);
|
|
||||||
}
|
}
|
||||||
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -243,4 +297,14 @@ public class CachedHistoryStorage extends AbstractService implements
|
||||||
}
|
}
|
||||||
return allJobs;
|
return allJobs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public boolean getUseLoadedTasksCache() {
|
||||||
|
return useLoadedTasksCache;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public int getLoadedTasksCacheSize() {
|
||||||
|
return loadedTasksCacheSize;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.io.IOException;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import com.google.common.cache.Cache;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -35,16 +36,27 @@ import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobListCache;
|
||||||
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
|
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static junit.framework.TestCase.assertEquals;
|
||||||
import static org.mockito.Mockito.*;
|
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
|
import static org.mockito.Mockito.doNothing;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.timeout;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||||
|
|
||||||
public class TestJobHistory {
|
public class TestJobHistory {
|
||||||
|
|
||||||
|
@ -58,13 +70,15 @@ public class TestJobHistory {
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
// Set the cache size to 2
|
// Set the cache size to 2
|
||||||
conf.set(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, "2");
|
conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, 2);
|
||||||
jobHistory.init(conf);
|
jobHistory.init(conf);
|
||||||
jobHistory.start();
|
jobHistory.start();
|
||||||
|
|
||||||
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
||||||
.getHistoryStorage());
|
.getHistoryStorage());
|
||||||
|
|
||||||
|
assertFalse(storage.getUseLoadedTasksCache());
|
||||||
|
|
||||||
Job[] jobs = new Job[3];
|
Job[] jobs = new Job[3];
|
||||||
JobId[] jobIds = new JobId[3];
|
JobId[] jobIds = new JobId[3];
|
||||||
|
|
||||||
|
@ -84,14 +98,13 @@ public class TestJobHistory {
|
||||||
storage.getFullJob(jobs[i].getID());
|
storage.getFullJob(jobs[i].getID());
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<JobId, Job> jobCache = storage.getLoadedJobCache();
|
Cache<JobId, Job> jobCache = storage.getLoadedJobCache();
|
||||||
// job0 should have been purged since cache size is 2
|
// Verify some jobs are stored in the cache. Hard to predict eviction
|
||||||
assertFalse(jobCache.containsKey(jobs[0].getID()));
|
// in Guava version.
|
||||||
assertTrue(jobCache.containsKey(jobs[1].getID())
|
assertTrue(jobCache.size() > 0);
|
||||||
&& jobCache.containsKey(jobs[2].getID()));
|
|
||||||
|
|
||||||
// Setting cache size to 3
|
// Setting cache size to 3
|
||||||
conf.set(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, "3");
|
conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, 3);
|
||||||
doReturn(conf).when(storage).createConf();
|
doReturn(conf).when(storage).createConf();
|
||||||
|
|
||||||
when(fileInfo.loadJob()).thenReturn(jobs[0]).thenReturn(jobs[1])
|
when(fileInfo.loadJob()).thenReturn(jobs[0]).thenReturn(jobs[1])
|
||||||
|
@ -105,9 +118,223 @@ public class TestJobHistory {
|
||||||
|
|
||||||
jobCache = storage.getLoadedJobCache();
|
jobCache = storage.getLoadedJobCache();
|
||||||
|
|
||||||
// All three jobs should be in cache since its size is now 3
|
// Verify some jobs are stored in the cache. Hard to predict eviction
|
||||||
for (int i = 0; i < 3; i++) {
|
// in Guava version.
|
||||||
assertTrue(jobCache.containsKey(jobs[i].getID()));
|
assertTrue(jobCache.size() > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTasksCacheLimit() throws Exception {
|
||||||
|
HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
||||||
|
jobHistory = spy(new JobHistory());
|
||||||
|
doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
// Set the cache threshold to 50 tasks
|
||||||
|
conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 50);
|
||||||
|
jobHistory.init(conf);
|
||||||
|
jobHistory.start();
|
||||||
|
|
||||||
|
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
||||||
|
.getHistoryStorage());
|
||||||
|
|
||||||
|
assertTrue(storage.getUseLoadedTasksCache());
|
||||||
|
assertEquals(storage.getLoadedTasksCacheSize(), 50);
|
||||||
|
|
||||||
|
// Create a bunch of smaller jobs (<< 50 tasks)
|
||||||
|
Job[] jobs = new Job[10];
|
||||||
|
JobId[] jobIds = new JobId[10];
|
||||||
|
for (int i = 0; i < jobs.length; i++) {
|
||||||
|
jobs[i] = mock(Job.class);
|
||||||
|
jobIds[i] = mock(JobId.class);
|
||||||
|
when(jobs[i].getID()).thenReturn(jobIds[i]);
|
||||||
|
when(jobs[i].getTotalMaps()).thenReturn(10);
|
||||||
|
when(jobs[i].getTotalReduces()).thenReturn(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create some large jobs that forces task-based cache flushing
|
||||||
|
Job[] lgJobs = new Job[3];
|
||||||
|
JobId[] lgJobIds = new JobId[3];
|
||||||
|
for (int i = 0; i < lgJobs.length; i++) {
|
||||||
|
lgJobs[i] = mock(Job.class);
|
||||||
|
lgJobIds[i] = mock(JobId.class);
|
||||||
|
when(lgJobs[i].getID()).thenReturn(lgJobIds[i]);
|
||||||
|
when(lgJobs[i].getTotalMaps()).thenReturn(2000);
|
||||||
|
when(lgJobs[i].getTotalReduces()).thenReturn(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
|
||||||
|
when(historyManager.getFileInfo(any(JobId.class))).thenReturn(fileInfo);
|
||||||
|
when(fileInfo.loadJob()).thenReturn(jobs[0]).thenReturn(jobs[1])
|
||||||
|
.thenReturn(jobs[2]).thenReturn(jobs[3]).thenReturn(jobs[4])
|
||||||
|
.thenReturn(jobs[5]).thenReturn(jobs[6]).thenReturn(jobs[7])
|
||||||
|
.thenReturn(jobs[8]).thenReturn(jobs[9]).thenReturn(lgJobs[0])
|
||||||
|
.thenReturn(lgJobs[1]).thenReturn(lgJobs[2]);
|
||||||
|
|
||||||
|
// getFullJob will put the job in the cache if it isn't there
|
||||||
|
Cache<JobId, Job> jobCache = storage.getLoadedJobCache();
|
||||||
|
for (int i = 0; i < jobs.length; i++) {
|
||||||
|
storage.getFullJob(jobs[i].getID());
|
||||||
|
}
|
||||||
|
long prevSize = jobCache.size();
|
||||||
|
|
||||||
|
// Fill the cache with some larger jobs and verify the cache
|
||||||
|
// gets reduced in size.
|
||||||
|
for (int i = 0; i < lgJobs.length; i++) {
|
||||||
|
storage.getFullJob(lgJobs[i].getID());
|
||||||
|
}
|
||||||
|
assertTrue(jobCache.size() < prevSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJobCacheLimitLargerThanMax() throws Exception {
|
||||||
|
HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
||||||
|
JobHistory jobHistory = spy(new JobHistory());
|
||||||
|
doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
// Set the cache threshold to 50 tasks
|
||||||
|
conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 500);
|
||||||
|
jobHistory.init(conf);
|
||||||
|
jobHistory.start();
|
||||||
|
|
||||||
|
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
||||||
|
.getHistoryStorage());
|
||||||
|
|
||||||
|
assertTrue(storage.getUseLoadedTasksCache());
|
||||||
|
assertEquals(storage.getLoadedTasksCacheSize(), 500);
|
||||||
|
|
||||||
|
// Create a bunch of large jobs (>> 50 tasks)
|
||||||
|
Job[] lgJobs = new Job[10];
|
||||||
|
JobId[] lgJobIds = new JobId[10];
|
||||||
|
for (int i = 0; i < lgJobs.length; i++) {
|
||||||
|
lgJobs[i] = mock(Job.class);
|
||||||
|
lgJobIds[i] = mock(JobId.class);
|
||||||
|
when(lgJobs[i].getID()).thenReturn(lgJobIds[i]);
|
||||||
|
when(lgJobs[i].getTotalMaps()).thenReturn(700);
|
||||||
|
when(lgJobs[i].getTotalReduces()).thenReturn(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
|
||||||
|
when(historyManager.getFileInfo(any(JobId.class))).thenReturn(fileInfo);
|
||||||
|
when(fileInfo.loadJob()).thenReturn(lgJobs[0]).thenReturn(lgJobs[1])
|
||||||
|
.thenReturn(lgJobs[2]).thenReturn(lgJobs[3]).thenReturn(lgJobs[4])
|
||||||
|
.thenReturn(lgJobs[5]).thenReturn(lgJobs[6]).thenReturn(lgJobs[7])
|
||||||
|
.thenReturn(lgJobs[8]).thenReturn(lgJobs[9]);
|
||||||
|
|
||||||
|
// getFullJob will put the job in the cache if it isn't there
|
||||||
|
Cache<JobId, Job> jobCache = storage.getLoadedJobCache();
|
||||||
|
long[] cacheSize = new long[10];
|
||||||
|
for (int i = 0; i < lgJobs.length; i++) {
|
||||||
|
storage.getFullJob(lgJobs[i].getID());
|
||||||
|
assertTrue(jobCache.size() > 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLoadedTasksEmptyConfiguration() {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, "");
|
||||||
|
|
||||||
|
HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
||||||
|
JobHistory jobHistory = spy(new JobHistory());
|
||||||
|
doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
||||||
|
jobHistory.init(conf);
|
||||||
|
jobHistory.start();
|
||||||
|
|
||||||
|
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
||||||
|
.getHistoryStorage());
|
||||||
|
|
||||||
|
assertFalse(storage.getUseLoadedTasksCache());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLoadedTasksZeroConfiguration() {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 0);
|
||||||
|
|
||||||
|
HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
||||||
|
JobHistory jobHistory = spy(new JobHistory());
|
||||||
|
doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
||||||
|
jobHistory.init(conf);
|
||||||
|
jobHistory.start();
|
||||||
|
|
||||||
|
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
||||||
|
.getHistoryStorage());
|
||||||
|
|
||||||
|
assertTrue(storage.getUseLoadedTasksCache());
|
||||||
|
assertEquals(storage.getLoadedTasksCacheSize(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLoadedTasksNegativeConfiguration() {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, -1);
|
||||||
|
|
||||||
|
HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
||||||
|
JobHistory jobHistory = spy(new JobHistory());
|
||||||
|
doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
||||||
|
jobHistory.init(conf);
|
||||||
|
jobHistory.start();
|
||||||
|
|
||||||
|
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
||||||
|
.getHistoryStorage());
|
||||||
|
|
||||||
|
assertTrue(storage.getUseLoadedTasksCache());
|
||||||
|
assertEquals(storage.getLoadedTasksCacheSize(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLoadJobErrorCases() throws IOException {
|
||||||
|
HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
||||||
|
jobHistory = spy(new JobHistory());
|
||||||
|
doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
// Set the cache threshold to 50 tasks
|
||||||
|
conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 50);
|
||||||
|
jobHistory.init(conf);
|
||||||
|
jobHistory.start();
|
||||||
|
|
||||||
|
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
||||||
|
.getHistoryStorage());
|
||||||
|
|
||||||
|
assertTrue(storage.getUseLoadedTasksCache());
|
||||||
|
assertEquals(storage.getLoadedTasksCacheSize(), 50);
|
||||||
|
|
||||||
|
// Create jobs for bad fileInfo results
|
||||||
|
Job[] jobs = new Job[4];
|
||||||
|
JobId[] jobIds = new JobId[4];
|
||||||
|
for (int i = 0; i < jobs.length; i++) {
|
||||||
|
jobs[i] = mock(Job.class);
|
||||||
|
jobIds[i] = mock(JobId.class);
|
||||||
|
when(jobs[i].getID()).thenReturn(jobIds[i]);
|
||||||
|
when(jobs[i].getTotalMaps()).thenReturn(10);
|
||||||
|
when(jobs[i].getTotalReduces()).thenReturn(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
HistoryFileInfo loadJobException = mock(HistoryFileInfo.class);
|
||||||
|
when(loadJobException.loadJob()).thenThrow(new IOException("History file not found"));
|
||||||
|
when(historyManager.getFileInfo(jobIds[0])).thenThrow(new IOException(""));
|
||||||
|
when(historyManager.getFileInfo(jobIds[1])).thenReturn(null);
|
||||||
|
when(historyManager.getFileInfo(jobIds[2])).thenReturn(loadJobException);
|
||||||
|
|
||||||
|
try {
|
||||||
|
storage.getFullJob(jobIds[0]);
|
||||||
|
fail("Did not get expected YarnRuntimeException for getFileInfo() throwing IOException");
|
||||||
|
} catch (YarnRuntimeException e) {
|
||||||
|
// Expected
|
||||||
|
}
|
||||||
|
|
||||||
|
// fileInfo==null should return null
|
||||||
|
Job job = storage.getFullJob(jobIds[1]);
|
||||||
|
assertNull(job);
|
||||||
|
|
||||||
|
try {
|
||||||
|
storage.getFullJob(jobIds[2]);
|
||||||
|
fail("Did not get expected YarnRuntimeException for fileInfo.loadJob() throwing IOException");
|
||||||
|
} catch (YarnRuntimeException e) {
|
||||||
|
// Expected
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue