From 3bd5174a8e0f873d3ed77b911d1e3f58fba73341 Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Mon, 29 Jul 2013 22:48:00 +0000 Subject: [PATCH] svn merge -c 1508220 FIXES: MAPREDUCE-5411. Refresh size of loaded job cache on history server. Contributed by Ashwin Shankar git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1508224 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapreduce/v2/hs/CachedHistoryStorage.java | 31 ++++- .../hadoop/mapreduce/v2/hs/JobHistory.java | 31 ++++- .../mapreduce/v2/hs/client/HSAdmin.java | 29 ++++- .../hs/protocol/HSAdminRefreshProtocol.java | 6 + ...RefreshProtocolClientSideTranslatorPB.java | 17 +++ ...RefreshProtocolServerSideTranslatorPB.java | 22 +++- .../mapreduce/v2/hs/server/HSAdminServer.java | 16 +++ .../main/proto/HSAdminRefreshProtocol.proto | 18 +++ .../mapreduce/v2/hs/TestJobHistory.java | 117 ++++++++++++++++++ .../v2/hs/server/TestHSAdminServer.java | 8 ++ 11 files changed, 289 insertions(+), 9 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e91e0a01745..ec01f35077c 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -15,6 +15,9 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-5386. Ability to refresh history server job retention and job cleaner settings (Ashwin Shankar via jlowe) + MAPREDUCE-5411. Refresh size of loaded job cache on history server (Ashwin + Shankar via jlowe) + IMPROVEMENTS OPTIMIZATIONS diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java index 57c02498a93..8c9abc3b0e1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java @@ -40,6 +40,8 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import com.google.common.annotations.VisibleForTesting; + /** * Manages an in memory cache of parsed Job History files. */ @@ -58,12 +60,16 @@ public class CachedHistoryStorage extends AbstractService implements this.hsManager = hsManager; } - @SuppressWarnings("serial") @Override public void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); LOG.info("CachedHistoryStorage Init"); + createLoadedJobCache(conf); + } + + @SuppressWarnings("serial") + private void createLoadedJobCache(Configuration conf) { loadedJobCacheSize = conf.getInt( JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, JHAdminConfig.DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE); @@ -76,11 +82,25 @@ public class CachedHistoryStorage extends AbstractService implements } }); } - + + public void refreshLoadedJobCache() { + if (getServiceState() == STATE.STARTED) { + setConfig(createConf()); + createLoadedJobCache(getConfig()); + } else { + LOG.warn("Failed to execute refreshLoadedJobCache: CachedHistoryStorage is not started"); + } + } + + @VisibleForTesting + Configuration createConf() { + return new Configuration(); + } + public CachedHistoryStorage() { super(CachedHistoryStorage.class.getName()); } - + private Job loadJob(HistoryFileInfo fileInfo) { try { Job job = fileInfo.loadJob(); @@ -98,6 +118,11 @@ public class CachedHistoryStorage extends AbstractService implements } } + @VisibleForTesting + Map getLoadedJobCache() { + return loadedJobCache; + } + @Override public Job getFullJob(JobId jobId) { if (LOG.isDebugEnabled()) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java index a316159ce83..da9bd3295f7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; import org.apache.hadoop.yarn.util.Clock; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -97,9 +98,8 @@ public class JobHistory extends AbstractService implements HistoryContext { throw new YarnRuntimeException("Failed to intialize existing directories", e); } - storage = ReflectionUtils.newInstance(conf.getClass( - JHAdminConfig.MR_HISTORY_STORAGE, CachedHistoryStorage.class, - HistoryStorage.class), conf); + storage = createHistoryStorage(); + if (storage instanceof Service) { ((Service) storage).init(conf); } @@ -108,6 +108,12 @@ public class JobHistory extends AbstractService implements HistoryContext { super.serviceInit(conf); } + protected HistoryStorage createHistoryStorage() { + return ReflectionUtils.newInstance(conf.getClass( + JHAdminConfig.MR_HISTORY_STORAGE, CachedHistoryStorage.class, + HistoryStorage.class), conf); + } + protected HistoryFileManager createHistoryFileManager() { return new HistoryFileManager(); } @@ -229,6 +235,25 @@ public class JobHistory extends AbstractService implements HistoryContext { return storage.getAllPartialJobs(); } + public void refreshLoadedJobCache() { + if (getServiceState() == STATE.STARTED) { + if (storage instanceof CachedHistoryStorage) { + ((CachedHistoryStorage) storage).refreshLoadedJobCache(); + } else { + throw new UnsupportedOperationException(storage.getClass().getName() + + " is expected to be an instance of " + + CachedHistoryStorage.class.getName()); + } + } else { + LOG.warn("Failed to execute refreshLoadedJobCache: JobHistory service is not started"); + } + } + + @VisibleForTesting + HistoryStorage getHistoryStorage() { + return storage; + } + /** * Look for a set of partial jobs. * diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/client/HSAdmin.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/client/HSAdmin.java index 87baaf0e55c..be6ca131819 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/client/HSAdmin.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/client/HSAdmin.java @@ -60,6 +60,8 @@ public class HSAdmin extends Configured implements Tool { .println("Usage: mapred hsadmin [-refreshSuperUserGroupsConfiguration]"); } else if ("-refreshAdminAcls".equals(cmd)) { System.err.println("Usage: mapred hsadmin [-refreshAdminAcls]"); + } else if ("-refreshLoadedJobCache".equals(cmd)) { + System.err.println("Usage: mapred hsadmin [-refreshLoadedJobCache]"); } else if ("-refreshJobRetentionSettings".equals(cmd)) { System.err .println("Usage: mapred hsadmin [-refreshJobRetentionSettings]"); @@ -73,6 +75,7 @@ public class HSAdmin extends Configured implements Tool { System.err.println(" [-refreshUserToGroupsMappings]"); System.err.println(" [-refreshSuperUserGroupsConfiguration]"); System.err.println(" [-refreshAdminAcls]"); + System.err.println(" [-refreshLoadedJobCache]"); System.err.println(" [-refreshJobRetentionSettings]"); System.err.println(" [-refreshLogRetentionSettings]"); System.err.println(" [-getGroups [username]]"); @@ -89,6 +92,7 @@ public class HSAdmin extends Configured implements Tool { + " [-refreshUserToGroupsMappings]" + " [-refreshSuperUserGroupsConfiguration]" + " [-refreshAdminAcls]" + + " [-refreshLoadedJobCache]" + " [-refreshLogRetentionSettings]" + " [-refreshJobRetentionSettings]" + " [-getGroups [username]]" + " [-help [cmd]]\n"; @@ -99,13 +103,14 @@ public class HSAdmin extends Configured implements Tool { String refreshAdminAcls = "-refreshAdminAcls: Refresh acls for administration of Job history server\n"; + String refreshLoadedJobCache = "-refreshLoadedJobCache: Refresh loaded job cache of Job history server\n"; + String refreshJobRetentionSettings = "-refreshJobRetentionSettings:" + "Refresh job history period,job cleaner settings\n"; String refreshLogRetentionSettings = "-refreshLogRetentionSettings:" + "Refresh log retention period and log retention check interval\n"; - String getGroups = "-getGroups [username]: Get the groups which given user belongs to\n"; String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" @@ -119,6 +124,8 @@ public class HSAdmin extends Configured implements Tool { System.out.println(refreshSuperUserGroupsConfiguration); } else if ("refreshAdminAcls".equals(cmd)) { System.out.println(refreshAdminAcls); + } else if ("refreshLoadedJobCache".equals(cmd)) { + System.out.println(refreshLoadedJobCache); } else if ("refreshJobRetentionSettings".equals(cmd)) { System.out.println(refreshJobRetentionSettings); } else if ("refreshLogRetentionSettings".equals(cmd)) { @@ -130,6 +137,7 @@ public class HSAdmin extends Configured implements Tool { System.out.println(refreshUserToGroupsMappings); System.out.println(refreshSuperUserGroupsConfiguration); System.out.println(refreshAdminAcls); + System.out.println(refreshLoadedJobCache); System.out.println(refreshJobRetentionSettings); System.out.println(refreshLogRetentionSettings); System.out.println(getGroups); @@ -221,6 +229,22 @@ public class HSAdmin extends Configured implements Tool { return 0; } + private int refreshLoadedJobCache() throws IOException { + // Refresh the loaded job cache + Configuration conf = getConf(); + InetSocketAddress address = conf.getSocketAddr( + JHAdminConfig.JHS_ADMIN_ADDRESS, + JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS, + JHAdminConfig.DEFAULT_JHS_ADMIN_PORT); + + HSAdminRefreshProtocol refreshProtocol = HSProxies.createProxy(conf, + address, HSAdminRefreshProtocol.class, + UserGroupInformation.getCurrentUser()); + + refreshProtocol.refreshLoadedJobCache(); + return 0; + } + private int refreshJobRetentionSettings() throws IOException { // Refresh job retention settings Configuration conf = getConf(); @@ -267,6 +291,7 @@ public class HSAdmin extends Configured implements Tool { if ("-refreshUserToGroupsMappings".equals(cmd) || "-refreshSuperUserGroupsConfiguration".equals(cmd) || "-refreshAdminAcls".equals(cmd) + || "-refreshLoadedJobCache".equals(cmd) || "-refreshJobRetentionSettings".equals(cmd) || "-refreshLogRetentionSettings".equals(cmd)) { if (args.length != 1) { @@ -282,6 +307,8 @@ public class HSAdmin extends Configured implements Tool { exitCode = refreshSuperUserGroupsConfiguration(); } else if ("-refreshAdminAcls".equals(cmd)) { exitCode = refreshAdminAcls(); + } else if ("-refreshLoadedJobCache".equals(cmd)) { + exitCode = refreshLoadedJobCache(); } else if ("-refreshJobRetentionSettings".equals(cmd)) { exitCode = refreshJobRetentionSettings(); } else if ("-refreshLogRetentionSettings".equals(cmd)) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocol/HSAdminRefreshProtocol.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocol/HSAdminRefreshProtocol.java index ea6b15cab04..82d9ae2d51c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocol/HSAdminRefreshProtocol.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocol/HSAdminRefreshProtocol.java @@ -40,6 +40,12 @@ public interface HSAdminRefreshProtocol { */ public void refreshAdminAcls() throws IOException; + /** + * Refresh loaded job cache + * @throws IOException + */ + public void refreshLoadedJobCache() throws IOException; + /** * Refresh job retention settings. * diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolClientSideTranslatorPB.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolClientSideTranslatorPB.java index 3d39a1b152a..8f1f512676c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolClientSideTranslatorPB.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolClientSideTranslatorPB.java @@ -27,6 +27,7 @@ import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto; +import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLoadedJobCacheRequestProto; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshJobRetentionSettingsRequestProto; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto; import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol; @@ -46,6 +47,10 @@ public class HSAdminRefreshProtocolClientSideTranslatorPB implements private final static RefreshAdminAclsRequestProto VOID_REFRESH_ADMIN_ACLS_REQUEST = RefreshAdminAclsRequestProto .newBuilder().build(); + + private final static RefreshLoadedJobCacheRequestProto + VOID_REFRESH_LOADED_JOB_CACHE_REQUEST = RefreshLoadedJobCacheRequestProto + .newBuilder().build(); private final static RefreshJobRetentionSettingsRequestProto VOID_REFRESH_JOB_RETENTION_SETTINGS_REQUEST = @@ -75,6 +80,17 @@ public class HSAdminRefreshProtocolClientSideTranslatorPB implements } } + + @Override + public void refreshLoadedJobCache() throws IOException { + try { + rpcProxy.refreshLoadedJobCache(NULL_CONTROLLER, + VOID_REFRESH_LOADED_JOB_CACHE_REQUEST); + } catch (ServiceException se) { + throw ProtobufHelper.getRemoteException(se); + } + } + @Override public void refreshJobRetentionSettings() throws IOException { try { @@ -101,4 +117,5 @@ public class HSAdminRefreshProtocolClientSideTranslatorPB implements HSAdminRefreshProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, RPC.getProtocolVersion(HSAdminRefreshProtocolPB.class), methodName); } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolServerSideTranslatorPB.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolServerSideTranslatorPB.java index a59b4a8dedb..77b4178e04f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolServerSideTranslatorPB.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolServerSideTranslatorPB.java @@ -23,6 +23,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsResponseProto; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto; +import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLoadedJobCacheRequestProto; +import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLoadedJobCacheResponseProto; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshJobRetentionSettingsRequestProto; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshJobRetentionSettingsResponseProto; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto; @@ -41,11 +43,15 @@ public class HSAdminRefreshProtocolServerSideTranslatorPB implements private final static RefreshAdminAclsResponseProto VOID_REFRESH_ADMIN_ACLS_RESPONSE = RefreshAdminAclsResponseProto .newBuilder().build(); - + + private final static RefreshLoadedJobCacheResponseProto + VOID_REFRESH_LOADED_JOB_CACHE_RESPONSE = RefreshLoadedJobCacheResponseProto + .newBuilder().build(); + private final static RefreshJobRetentionSettingsResponseProto VOID_REFRESH_JOB_RETENTION_SETTINGS_RESPONSE = RefreshJobRetentionSettingsResponseProto.newBuilder().build(); - + private final static RefreshLogRetentionSettingsResponseProto VOID_REFRESH_LOG_RETENTION_SETTINGS_RESPONSE = RefreshLogRetentionSettingsResponseProto.newBuilder().build(); @@ -67,6 +73,18 @@ public class HSAdminRefreshProtocolServerSideTranslatorPB implements return VOID_REFRESH_ADMIN_ACLS_RESPONSE; } + @Override + public RefreshLoadedJobCacheResponseProto refreshLoadedJobCache( + RpcController controller, RefreshLoadedJobCacheRequestProto request) + throws ServiceException { + try { + impl.refreshLoadedJobCache(); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_REFRESH_LOADED_JOB_CACHE_RESPONSE; + } + @Override public RefreshJobRetentionSettingsResponseProto refreshJobRetentionSettings( RpcController controller, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java index ad8e7201a6e..23a34a47b94 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java @@ -202,6 +202,22 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol { HISTORY_ADMIN_SERVER); } + @Override + public void refreshLoadedJobCache() throws IOException { + UserGroupInformation user = checkAcls("refreshLoadedJobCache"); + + try { + jobHistoryService.refreshLoadedJobCache(); + } catch (UnsupportedOperationException e) { + HSAuditLogger.logFailure(user.getShortUserName(), + "refreshLoadedJobCache", adminAcl.toString(), HISTORY_ADMIN_SERVER, + e.getMessage()); + throw e; + } + HSAuditLogger.logSuccess(user.getShortUserName(), "refreshLoadedJobCache", + HISTORY_ADMIN_SERVER); + } + @Override public void refreshLogRetentionSettings() throws IOException { UserGroupInformation user = checkAcls("refreshLogRetentionSettings"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/proto/HSAdminRefreshProtocol.proto b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/proto/HSAdminRefreshProtocol.proto index eb968e79201..e23f0a15221 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/proto/HSAdminRefreshProtocol.proto +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/proto/HSAdminRefreshProtocol.proto @@ -33,6 +33,18 @@ message RefreshAdminAclsRequestProto { message RefreshAdminAclsResponseProto { } +/** + * refresh loaded job cache request. + */ +message RefreshLoadedJobCacheRequestProto { +} + +/** + * Response for refresh loaded job cache. + */ +message RefreshLoadedJobCacheResponseProto { +} + /** * refresh job retention settings request. */ @@ -66,6 +78,12 @@ service HSAdminRefreshProtocolService { */ rpc refreshAdminAcls(RefreshAdminAclsRequestProto) returns(RefreshAdminAclsResponseProto); + + /** + * Refresh loaded job cache + */ + rpc refreshLoadedJobCache(RefreshLoadedJobCacheRequestProto) + returns(RefreshLoadedJobCacheResponseProto); /** * Refresh job retention. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java index 365afd66f6f..2e6d4cebda5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.v2.hs; +import java.util.Map; + import java.io.IOException; import java.util.LinkedList; import java.util.List; @@ -27,8 +29,10 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobListCache; +import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.junit.After; @@ -36,11 +40,76 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.*; +import org.apache.hadoop.mapreduce.v2.app.job.Job; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class TestJobHistory { JobHistory jobHistory = null; + @Test + public void testRefreshLoadedJobCache() throws Exception { + HistoryFileManager historyManager = mock(HistoryFileManager.class); + jobHistory = spy(new JobHistory()); + doReturn(historyManager).when(jobHistory).createHistoryFileManager(); + + Configuration conf = new Configuration(); + // Set the cache size to 2 + conf.set(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, "2"); + jobHistory.init(conf); + jobHistory.start(); + + CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory + .getHistoryStorage()); + + Job[] jobs = new Job[3]; + JobId[] jobIds = new JobId[3]; + + for (int i = 0; i < 3; i++) { + jobs[i] = mock(Job.class); + jobIds[i] = mock(JobId.class); + when(jobs[i].getID()).thenReturn(jobIds[i]); + } + + HistoryFileInfo fileInfo = mock(HistoryFileInfo.class); + when(historyManager.getFileInfo(any(JobId.class))).thenReturn(fileInfo); + when(fileInfo.loadJob()).thenReturn(jobs[0]).thenReturn(jobs[1]) + .thenReturn(jobs[2]); + + // getFullJob will put the job in the cache if it isn't there + for (int i = 0; i < 3; i++) { + storage.getFullJob(jobs[i].getID()); + } + + Map jobCache = storage.getLoadedJobCache(); + // job0 should have been purged since cache size is 2 + assertFalse(jobCache.containsKey(jobs[0].getID())); + assertTrue(jobCache.containsKey(jobs[1].getID()) + && jobCache.containsKey(jobs[2].getID())); + + // Setting cache size to 3 + conf.set(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, "3"); + doReturn(conf).when(storage).createConf(); + + when(fileInfo.loadJob()).thenReturn(jobs[0]).thenReturn(jobs[1]) + .thenReturn(jobs[2]); + + jobHistory.refreshLoadedJobCache(); + + for (int i = 0; i < 3; i++) { + storage.getFullJob(jobs[i].getID()); + } + + jobCache = storage.getLoadedJobCache(); + + // All three jobs should be in cache since its size is now 3 + for (int i = 0; i < 3; i++) { + assertTrue(jobCache.containsKey(jobs[i].getID())); + } + } + @Test public void testRefreshJobRetentionSettings() throws IOException, InterruptedException { @@ -147,6 +216,54 @@ public class TestJobHistory { verify(fileInfo, timeout(20000).times(2)).delete(); } + @Test + public void testRefreshLoadedJobCacheUnSupportedOperation() { + jobHistory = spy(new JobHistory()); + HistoryStorage storage = new HistoryStorage() { + + @Override + public void setHistoryFileManager(HistoryFileManager hsManager) { + // TODO Auto-generated method stub + + } + + @Override + public JobsInfo getPartialJobs(Long offset, Long count, String user, + String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, + JobState jobState) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Job getFullJob(JobId jobId) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Map getAllPartialJobs() { + // TODO Auto-generated method stub + return null; + } + }; + + doReturn(storage).when(jobHistory).createHistoryStorage(); + + jobHistory.init(new Configuration()); + + jobHistory.start(); + + Throwable th = null; + try { + jobHistory.refreshLoadedJobCache(); + } catch (Exception e) { + th = e; + } + + assertTrue(th instanceof UnsupportedOperationException); + } + @After public void cleanUp() { if (jobHistory != null) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java index 2474f575d61..f31ebe3b123 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java @@ -241,6 +241,14 @@ public class TestHSAdminServer { assertTrue(th instanceof RemoteException); } + @Test + public void testRefreshLoadedJobCache() throws Exception { + String[] args = new String[1]; + args[0] = "-refreshLoadedJobCache"; + hsAdminClient.run(args); + verify(jobHistoryService).refreshLoadedJobCache(); + } + @Test public void testRefreshLogRetentionSettings() throws Exception { String[] args = new String[1];