svn merge -c 1508220 FIXES: MAPREDUCE-5411. Refresh size of loaded job cache on history server. Contributed by Ashwin Shankar
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1508224 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7b38377e2b
commit
3bd5174a8e
|
@ -15,6 +15,9 @@ Release 2.3.0 - UNRELEASED
|
|||
MAPREDUCE-5386. Ability to refresh history server job retention and job
|
||||
cleaner settings (Ashwin Shankar via jlowe)
|
||||
|
||||
MAPREDUCE-5411. Refresh size of loaded job cache on history server (Ashwin
|
||||
Shankar via jlowe)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
|
|
@ -40,6 +40,8 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
|||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Manages an in memory cache of parsed Job History files.
|
||||
*/
|
||||
|
@ -58,12 +60,16 @@ public class CachedHistoryStorage extends AbstractService implements
|
|||
this.hsManager = hsManager;
|
||||
}
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
LOG.info("CachedHistoryStorage Init");
|
||||
|
||||
createLoadedJobCache(conf);
|
||||
}
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
private void createLoadedJobCache(Configuration conf) {
|
||||
loadedJobCacheSize = conf.getInt(
|
||||
JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
|
||||
JHAdminConfig.DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE);
|
||||
|
@ -76,11 +82,25 @@ public class CachedHistoryStorage extends AbstractService implements
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
public void refreshLoadedJobCache() {
|
||||
if (getServiceState() == STATE.STARTED) {
|
||||
setConfig(createConf());
|
||||
createLoadedJobCache(getConfig());
|
||||
} else {
|
||||
LOG.warn("Failed to execute refreshLoadedJobCache: CachedHistoryStorage is not started");
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Configuration createConf() {
|
||||
return new Configuration();
|
||||
}
|
||||
|
||||
public CachedHistoryStorage() {
|
||||
super(CachedHistoryStorage.class.getName());
|
||||
}
|
||||
|
||||
|
||||
private Job loadJob(HistoryFileInfo fileInfo) {
|
||||
try {
|
||||
Job job = fileInfo.loadJob();
|
||||
|
@ -98,6 +118,11 @@ public class CachedHistoryStorage extends AbstractService implements
|
|||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Map<JobId, Job> getLoadedJobCache() {
|
||||
return loadedJobCache;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Job getFullJob(JobId jobId) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
|
@ -97,9 +98,8 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|||
throw new YarnRuntimeException("Failed to intialize existing directories", e);
|
||||
}
|
||||
|
||||
storage = ReflectionUtils.newInstance(conf.getClass(
|
||||
JHAdminConfig.MR_HISTORY_STORAGE, CachedHistoryStorage.class,
|
||||
HistoryStorage.class), conf);
|
||||
storage = createHistoryStorage();
|
||||
|
||||
if (storage instanceof Service) {
|
||||
((Service) storage).init(conf);
|
||||
}
|
||||
|
@ -108,6 +108,12 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
protected HistoryStorage createHistoryStorage() {
|
||||
return ReflectionUtils.newInstance(conf.getClass(
|
||||
JHAdminConfig.MR_HISTORY_STORAGE, CachedHistoryStorage.class,
|
||||
HistoryStorage.class), conf);
|
||||
}
|
||||
|
||||
protected HistoryFileManager createHistoryFileManager() {
|
||||
return new HistoryFileManager();
|
||||
}
|
||||
|
@ -229,6 +235,25 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
|||
return storage.getAllPartialJobs();
|
||||
}
|
||||
|
||||
public void refreshLoadedJobCache() {
|
||||
if (getServiceState() == STATE.STARTED) {
|
||||
if (storage instanceof CachedHistoryStorage) {
|
||||
((CachedHistoryStorage) storage).refreshLoadedJobCache();
|
||||
} else {
|
||||
throw new UnsupportedOperationException(storage.getClass().getName()
|
||||
+ " is expected to be an instance of "
|
||||
+ CachedHistoryStorage.class.getName());
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Failed to execute refreshLoadedJobCache: JobHistory service is not started");
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
HistoryStorage getHistoryStorage() {
|
||||
return storage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Look for a set of partial jobs.
|
||||
*
|
||||
|
|
|
@ -60,6 +60,8 @@ public class HSAdmin extends Configured implements Tool {
|
|||
.println("Usage: mapred hsadmin [-refreshSuperUserGroupsConfiguration]");
|
||||
} else if ("-refreshAdminAcls".equals(cmd)) {
|
||||
System.err.println("Usage: mapred hsadmin [-refreshAdminAcls]");
|
||||
} else if ("-refreshLoadedJobCache".equals(cmd)) {
|
||||
System.err.println("Usage: mapred hsadmin [-refreshLoadedJobCache]");
|
||||
} else if ("-refreshJobRetentionSettings".equals(cmd)) {
|
||||
System.err
|
||||
.println("Usage: mapred hsadmin [-refreshJobRetentionSettings]");
|
||||
|
@ -73,6 +75,7 @@ public class HSAdmin extends Configured implements Tool {
|
|||
System.err.println(" [-refreshUserToGroupsMappings]");
|
||||
System.err.println(" [-refreshSuperUserGroupsConfiguration]");
|
||||
System.err.println(" [-refreshAdminAcls]");
|
||||
System.err.println(" [-refreshLoadedJobCache]");
|
||||
System.err.println(" [-refreshJobRetentionSettings]");
|
||||
System.err.println(" [-refreshLogRetentionSettings]");
|
||||
System.err.println(" [-getGroups [username]]");
|
||||
|
@ -89,6 +92,7 @@ public class HSAdmin extends Configured implements Tool {
|
|||
+ " [-refreshUserToGroupsMappings]"
|
||||
+ " [-refreshSuperUserGroupsConfiguration]"
|
||||
+ " [-refreshAdminAcls]"
|
||||
+ " [-refreshLoadedJobCache]"
|
||||
+ " [-refreshLogRetentionSettings]"
|
||||
+ " [-refreshJobRetentionSettings]"
|
||||
+ " [-getGroups [username]]" + " [-help [cmd]]\n";
|
||||
|
@ -99,13 +103,14 @@ public class HSAdmin extends Configured implements Tool {
|
|||
|
||||
String refreshAdminAcls = "-refreshAdminAcls: Refresh acls for administration of Job history server\n";
|
||||
|
||||
String refreshLoadedJobCache = "-refreshLoadedJobCache: Refresh loaded job cache of Job history server\n";
|
||||
|
||||
String refreshJobRetentionSettings = "-refreshJobRetentionSettings:" +
|
||||
"Refresh job history period,job cleaner settings\n";
|
||||
|
||||
String refreshLogRetentionSettings = "-refreshLogRetentionSettings:" +
|
||||
"Refresh log retention period and log retention check interval\n";
|
||||
|
||||
|
||||
String getGroups = "-getGroups [username]: Get the groups which given user belongs to\n";
|
||||
|
||||
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n"
|
||||
|
@ -119,6 +124,8 @@ public class HSAdmin extends Configured implements Tool {
|
|||
System.out.println(refreshSuperUserGroupsConfiguration);
|
||||
} else if ("refreshAdminAcls".equals(cmd)) {
|
||||
System.out.println(refreshAdminAcls);
|
||||
} else if ("refreshLoadedJobCache".equals(cmd)) {
|
||||
System.out.println(refreshLoadedJobCache);
|
||||
} else if ("refreshJobRetentionSettings".equals(cmd)) {
|
||||
System.out.println(refreshJobRetentionSettings);
|
||||
} else if ("refreshLogRetentionSettings".equals(cmd)) {
|
||||
|
@ -130,6 +137,7 @@ public class HSAdmin extends Configured implements Tool {
|
|||
System.out.println(refreshUserToGroupsMappings);
|
||||
System.out.println(refreshSuperUserGroupsConfiguration);
|
||||
System.out.println(refreshAdminAcls);
|
||||
System.out.println(refreshLoadedJobCache);
|
||||
System.out.println(refreshJobRetentionSettings);
|
||||
System.out.println(refreshLogRetentionSettings);
|
||||
System.out.println(getGroups);
|
||||
|
@ -221,6 +229,22 @@ public class HSAdmin extends Configured implements Tool {
|
|||
return 0;
|
||||
}
|
||||
|
||||
private int refreshLoadedJobCache() throws IOException {
|
||||
// Refresh the loaded job cache
|
||||
Configuration conf = getConf();
|
||||
InetSocketAddress address = conf.getSocketAddr(
|
||||
JHAdminConfig.JHS_ADMIN_ADDRESS,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);
|
||||
|
||||
HSAdminRefreshProtocol refreshProtocol = HSProxies.createProxy(conf,
|
||||
address, HSAdminRefreshProtocol.class,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
|
||||
refreshProtocol.refreshLoadedJobCache();
|
||||
return 0;
|
||||
}
|
||||
|
||||
private int refreshJobRetentionSettings() throws IOException {
|
||||
// Refresh job retention settings
|
||||
Configuration conf = getConf();
|
||||
|
@ -267,6 +291,7 @@ public class HSAdmin extends Configured implements Tool {
|
|||
if ("-refreshUserToGroupsMappings".equals(cmd)
|
||||
|| "-refreshSuperUserGroupsConfiguration".equals(cmd)
|
||||
|| "-refreshAdminAcls".equals(cmd)
|
||||
|| "-refreshLoadedJobCache".equals(cmd)
|
||||
|| "-refreshJobRetentionSettings".equals(cmd)
|
||||
|| "-refreshLogRetentionSettings".equals(cmd)) {
|
||||
if (args.length != 1) {
|
||||
|
@ -282,6 +307,8 @@ public class HSAdmin extends Configured implements Tool {
|
|||
exitCode = refreshSuperUserGroupsConfiguration();
|
||||
} else if ("-refreshAdminAcls".equals(cmd)) {
|
||||
exitCode = refreshAdminAcls();
|
||||
} else if ("-refreshLoadedJobCache".equals(cmd)) {
|
||||
exitCode = refreshLoadedJobCache();
|
||||
} else if ("-refreshJobRetentionSettings".equals(cmd)) {
|
||||
exitCode = refreshJobRetentionSettings();
|
||||
} else if ("-refreshLogRetentionSettings".equals(cmd)) {
|
||||
|
|
|
@ -40,6 +40,12 @@ public interface HSAdminRefreshProtocol {
|
|||
*/
|
||||
public void refreshAdminAcls() throws IOException;
|
||||
|
||||
/**
|
||||
* Refresh loaded job cache
|
||||
* @throws IOException
|
||||
*/
|
||||
public void refreshLoadedJobCache() throws IOException;
|
||||
|
||||
/**
|
||||
* Refresh job retention settings.
|
||||
*
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.ipc.ProtocolMetaInterface;
|
|||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RpcClientUtil;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLoadedJobCacheRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshJobRetentionSettingsRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol;
|
||||
|
@ -46,6 +47,10 @@ public class HSAdminRefreshProtocolClientSideTranslatorPB implements
|
|||
private final static RefreshAdminAclsRequestProto
|
||||
VOID_REFRESH_ADMIN_ACLS_REQUEST = RefreshAdminAclsRequestProto
|
||||
.newBuilder().build();
|
||||
|
||||
private final static RefreshLoadedJobCacheRequestProto
|
||||
VOID_REFRESH_LOADED_JOB_CACHE_REQUEST = RefreshLoadedJobCacheRequestProto
|
||||
.newBuilder().build();
|
||||
|
||||
private final static RefreshJobRetentionSettingsRequestProto
|
||||
VOID_REFRESH_JOB_RETENTION_SETTINGS_REQUEST =
|
||||
|
@ -75,6 +80,17 @@ public class HSAdminRefreshProtocolClientSideTranslatorPB implements
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void refreshLoadedJobCache() throws IOException {
|
||||
try {
|
||||
rpcProxy.refreshLoadedJobCache(NULL_CONTROLLER,
|
||||
VOID_REFRESH_LOADED_JOB_CACHE_REQUEST);
|
||||
} catch (ServiceException se) {
|
||||
throw ProtobufHelper.getRemoteException(se);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshJobRetentionSettings() throws IOException {
|
||||
try {
|
||||
|
@ -101,4 +117,5 @@ public class HSAdminRefreshProtocolClientSideTranslatorPB implements
|
|||
HSAdminRefreshProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
|
||||
RPC.getProtocolVersion(HSAdminRefreshProtocolPB.class), methodName);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ import java.io.IOException;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsResponseProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLoadedJobCacheRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLoadedJobCacheResponseProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshJobRetentionSettingsRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshJobRetentionSettingsResponseProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto;
|
||||
|
@ -41,11 +43,15 @@ public class HSAdminRefreshProtocolServerSideTranslatorPB implements
|
|||
private final static RefreshAdminAclsResponseProto
|
||||
VOID_REFRESH_ADMIN_ACLS_RESPONSE = RefreshAdminAclsResponseProto
|
||||
.newBuilder().build();
|
||||
|
||||
|
||||
private final static RefreshLoadedJobCacheResponseProto
|
||||
VOID_REFRESH_LOADED_JOB_CACHE_RESPONSE = RefreshLoadedJobCacheResponseProto
|
||||
.newBuilder().build();
|
||||
|
||||
private final static RefreshJobRetentionSettingsResponseProto
|
||||
VOID_REFRESH_JOB_RETENTION_SETTINGS_RESPONSE =
|
||||
RefreshJobRetentionSettingsResponseProto.newBuilder().build();
|
||||
|
||||
|
||||
private final static RefreshLogRetentionSettingsResponseProto
|
||||
VOID_REFRESH_LOG_RETENTION_SETTINGS_RESPONSE =
|
||||
RefreshLogRetentionSettingsResponseProto.newBuilder().build();
|
||||
|
@ -67,6 +73,18 @@ public class HSAdminRefreshProtocolServerSideTranslatorPB implements
|
|||
return VOID_REFRESH_ADMIN_ACLS_RESPONSE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshLoadedJobCacheResponseProto refreshLoadedJobCache(
|
||||
RpcController controller, RefreshLoadedJobCacheRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
impl.refreshLoadedJobCache();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
return VOID_REFRESH_LOADED_JOB_CACHE_RESPONSE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshJobRetentionSettingsResponseProto refreshJobRetentionSettings(
|
||||
RpcController controller,
|
||||
|
|
|
@ -202,6 +202,22 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
|
|||
HISTORY_ADMIN_SERVER);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshLoadedJobCache() throws IOException {
|
||||
UserGroupInformation user = checkAcls("refreshLoadedJobCache");
|
||||
|
||||
try {
|
||||
jobHistoryService.refreshLoadedJobCache();
|
||||
} catch (UnsupportedOperationException e) {
|
||||
HSAuditLogger.logFailure(user.getShortUserName(),
|
||||
"refreshLoadedJobCache", adminAcl.toString(), HISTORY_ADMIN_SERVER,
|
||||
e.getMessage());
|
||||
throw e;
|
||||
}
|
||||
HSAuditLogger.logSuccess(user.getShortUserName(), "refreshLoadedJobCache",
|
||||
HISTORY_ADMIN_SERVER);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshLogRetentionSettings() throws IOException {
|
||||
UserGroupInformation user = checkAcls("refreshLogRetentionSettings");
|
||||
|
|
|
@ -33,6 +33,18 @@ message RefreshAdminAclsRequestProto {
|
|||
message RefreshAdminAclsResponseProto {
|
||||
}
|
||||
|
||||
/**
|
||||
* refresh loaded job cache request.
|
||||
*/
|
||||
message RefreshLoadedJobCacheRequestProto {
|
||||
}
|
||||
|
||||
/**
|
||||
* Response for refresh loaded job cache.
|
||||
*/
|
||||
message RefreshLoadedJobCacheResponseProto {
|
||||
}
|
||||
|
||||
/**
|
||||
* refresh job retention settings request.
|
||||
*/
|
||||
|
@ -66,6 +78,12 @@ service HSAdminRefreshProtocolService {
|
|||
*/
|
||||
rpc refreshAdminAcls(RefreshAdminAclsRequestProto)
|
||||
returns(RefreshAdminAclsResponseProto);
|
||||
|
||||
/**
|
||||
* Refresh loaded job cache
|
||||
*/
|
||||
rpc refreshLoadedJobCache(RefreshLoadedJobCacheRequestProto)
|
||||
returns(RefreshLoadedJobCacheResponseProto);
|
||||
|
||||
/**
|
||||
* Refresh job retention.
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.mapreduce.v2.hs;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
@ -27,8 +29,10 @@ import org.apache.hadoop.fs.FileContext;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobListCache;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||
import org.junit.After;
|
||||
|
@ -36,11 +40,76 @@ import org.junit.Test;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.*;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestJobHistory {
|
||||
|
||||
JobHistory jobHistory = null;
|
||||
|
||||
@Test
|
||||
public void testRefreshLoadedJobCache() throws Exception {
|
||||
HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
||||
jobHistory = spy(new JobHistory());
|
||||
doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
// Set the cache size to 2
|
||||
conf.set(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, "2");
|
||||
jobHistory.init(conf);
|
||||
jobHistory.start();
|
||||
|
||||
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
||||
.getHistoryStorage());
|
||||
|
||||
Job[] jobs = new Job[3];
|
||||
JobId[] jobIds = new JobId[3];
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
jobs[i] = mock(Job.class);
|
||||
jobIds[i] = mock(JobId.class);
|
||||
when(jobs[i].getID()).thenReturn(jobIds[i]);
|
||||
}
|
||||
|
||||
HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
|
||||
when(historyManager.getFileInfo(any(JobId.class))).thenReturn(fileInfo);
|
||||
when(fileInfo.loadJob()).thenReturn(jobs[0]).thenReturn(jobs[1])
|
||||
.thenReturn(jobs[2]);
|
||||
|
||||
// getFullJob will put the job in the cache if it isn't there
|
||||
for (int i = 0; i < 3; i++) {
|
||||
storage.getFullJob(jobs[i].getID());
|
||||
}
|
||||
|
||||
Map<JobId, Job> jobCache = storage.getLoadedJobCache();
|
||||
// job0 should have been purged since cache size is 2
|
||||
assertFalse(jobCache.containsKey(jobs[0].getID()));
|
||||
assertTrue(jobCache.containsKey(jobs[1].getID())
|
||||
&& jobCache.containsKey(jobs[2].getID()));
|
||||
|
||||
// Setting cache size to 3
|
||||
conf.set(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, "3");
|
||||
doReturn(conf).when(storage).createConf();
|
||||
|
||||
when(fileInfo.loadJob()).thenReturn(jobs[0]).thenReturn(jobs[1])
|
||||
.thenReturn(jobs[2]);
|
||||
|
||||
jobHistory.refreshLoadedJobCache();
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
storage.getFullJob(jobs[i].getID());
|
||||
}
|
||||
|
||||
jobCache = storage.getLoadedJobCache();
|
||||
|
||||
// All three jobs should be in cache since its size is now 3
|
||||
for (int i = 0; i < 3; i++) {
|
||||
assertTrue(jobCache.containsKey(jobs[i].getID()));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshJobRetentionSettings() throws IOException,
|
||||
InterruptedException {
|
||||
|
@ -147,6 +216,54 @@ public class TestJobHistory {
|
|||
verify(fileInfo, timeout(20000).times(2)).delete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshLoadedJobCacheUnSupportedOperation() {
|
||||
jobHistory = spy(new JobHistory());
|
||||
HistoryStorage storage = new HistoryStorage() {
|
||||
|
||||
@Override
|
||||
public void setHistoryFileManager(HistoryFileManager hsManager) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public JobsInfo getPartialJobs(Long offset, Long count, String user,
|
||||
String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
|
||||
JobState jobState) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Job getFullJob(JobId jobId) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<JobId, Job> getAllPartialJobs() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
doReturn(storage).when(jobHistory).createHistoryStorage();
|
||||
|
||||
jobHistory.init(new Configuration());
|
||||
|
||||
jobHistory.start();
|
||||
|
||||
Throwable th = null;
|
||||
try {
|
||||
jobHistory.refreshLoadedJobCache();
|
||||
} catch (Exception e) {
|
||||
th = e;
|
||||
}
|
||||
|
||||
assertTrue(th instanceof UnsupportedOperationException);
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanUp() {
|
||||
if (jobHistory != null) {
|
||||
|
|
|
@ -241,6 +241,14 @@ public class TestHSAdminServer {
|
|||
assertTrue(th instanceof RemoteException);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshLoadedJobCache() throws Exception {
|
||||
String[] args = new String[1];
|
||||
args[0] = "-refreshLoadedJobCache";
|
||||
hsAdminClient.run(args);
|
||||
verify(jobHistoryService).refreshLoadedJobCache();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshLogRetentionSettings() throws Exception {
|
||||
String[] args = new String[1];
|
||||
|
|
Loading…
Reference in New Issue