svn merge -c 1507135 FIXES: MAPREDUCE-5386. Ability to refresh history server job retention and job cleaner settings. Contributed by Ashwin Shankar

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1507378 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-07-26 17:32:00 +00:00
parent 4839eca238
commit fca19f6a8a
12 changed files with 387 additions and 48 deletions

View File

@ -12,6 +12,9 @@ Release 2.3.0 - UNRELEASED
MAPREDUCE-5356. Ability to refresh aggregated log retention period and
check interval (Ashwin Shankar via jlowe)
MAPREDUCE-5386. Ability to refresh history server job retention and job
cleaner settings (Ashwin Shankar via jlowe)
IMPROVEMENTS
OPTIMIZATIONS

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
@ -61,6 +62,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -416,7 +418,7 @@ public class HistoryFileManager extends AbstractService {
return historyFile;
}
private synchronized void delete() throws IOException {
protected synchronized void delete() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("deleting " + historyFile + " and " + confFile);
}
@ -524,10 +526,7 @@ public class HistoryFileManager extends AbstractService {
maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
jobListCache = new JobListCache(conf.getInt(
JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE),
maxHistoryAge);
jobListCache = createJobListCache();
serialNumberIndex = new SerialNumberIndex(conf.getInt(
JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
@ -544,6 +543,12 @@ public class HistoryFileManager extends AbstractService {
super.serviceInit(conf);
}
protected JobListCache createJobListCache() {
return new JobListCache(conf.getInt(
JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge);
}
private void mkdir(FileContext fc, Path path, FsPermission fsp)
throws IOException {
if (!fc.util().exists(path)) {
@ -656,18 +661,18 @@ public class HistoryFileManager extends AbstractService {
return jhStatusList;
}
private static List<FileStatus> scanDirectoryForHistoryFiles(Path path,
protected List<FileStatus> scanDirectoryForHistoryFiles(Path path,
FileContext fc) throws IOException {
return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
}
/**
* Finds all history directories with a timestamp component by scanning the
* filesystem. Used when the JobHistory server is started.
*
* @return
* @return list of history directories
*/
private List<FileStatus> findTimestampedDirectories() throws IOException {
protected List<FileStatus> findTimestampedDirectories() throws IOException {
List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc,
doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
return fsList;
@ -954,7 +959,7 @@ public class HistoryFileManager extends AbstractService {
}
}
if (!halted) {
doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
deleteDir(serialDir);
removeDirectoryFromSerialNumberIndex(serialDir.getPath());
existingDoneSubdirs.remove(serialDir.getPath());
} else {
@ -962,6 +967,13 @@ public class HistoryFileManager extends AbstractService {
}
}
}
protected boolean deleteDir(FileStatus serialDir)
throws AccessControlException, FileNotFoundException,
UnsupportedFileSystemException, IOException {
return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
}
@VisibleForTesting
protected void setMaxHistoryAge(long newValue){
maxHistoryAge=newValue;

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@ -71,7 +72,11 @@ public class JobHistory extends AbstractService implements HistoryContext {
private HistoryStorage storage = null;
private HistoryFileManager hsManager = null;
ScheduledFuture<?> futureHistoryCleaner = null;
//History job cleaner interval
private long cleanerInterval;
@Override
protected void serviceInit(Configuration conf) throws Exception {
LOG.info("JobHistory Init");
@ -84,7 +89,7 @@ public class JobHistory extends AbstractService implements HistoryContext {
JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);
hsManager = new HistoryFileManager();
hsManager = createHistoryFileManager();
hsManager.init(conf);
try {
hsManager.initExisting();
@ -103,6 +108,10 @@ public class JobHistory extends AbstractService implements HistoryContext {
super.serviceInit(conf);
}
protected HistoryFileManager createHistoryFileManager() {
return new HistoryFileManager();
}
@Override
protected void serviceStart() throws Exception {
hsManager.start();
@ -118,19 +127,14 @@ public class JobHistory extends AbstractService implements HistoryContext {
moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS);
// Start historyCleaner
boolean startCleanerService = conf.getBoolean(
JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
if (startCleanerService) {
long runInterval = conf.getLong(
JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS);
scheduledExecutor
.scheduleAtFixedRate(new HistoryCleaner(),
30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
}
scheduleHistoryCleaner();
super.serviceStart();
}
protected int getInitDelaySecs() {
return 30;
}
@Override
protected void serviceStop() throws Exception {
LOG.info("Stopping JobHistory");
@ -256,6 +260,43 @@ public class JobHistory extends AbstractService implements HistoryContext {
fBegin, fEnd, jobState);
}
public void refreshJobRetentionSettings() {
if (getServiceState() == STATE.STARTED) {
conf = createConf();
long maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
hsManager.setMaxHistoryAge(maxHistoryAge);
if (futureHistoryCleaner != null) {
futureHistoryCleaner.cancel(false);
}
futureHistoryCleaner = null;
scheduleHistoryCleaner();
} else {
LOG.warn("Failed to execute refreshJobRetentionSettings : Job History service is not started");
}
}
private void scheduleHistoryCleaner() {
boolean startCleanerService = conf.getBoolean(
JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
if (startCleanerService) {
cleanerInterval = conf.getLong(
JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS);
futureHistoryCleaner = scheduledExecutor.scheduleAtFixedRate(
new HistoryCleaner(), getInitDelaySecs() * 1000l, cleanerInterval,
TimeUnit.MILLISECONDS);
}
}
protected Configuration createConf() {
return new Configuration();
}
public long getCleanerInterval() {
return cleanerInterval;
}
// TODO AppContext - Not Required
private ApplicationAttemptId appAttemptID;

View File

@ -83,7 +83,7 @@ public class JobHistoryServer extends CompositeService {
clientService = new HistoryClientService(historyContext,
this.jhsDTSecretManager);
aggLogDelService = new AggregatedLogDeletionService();
hsAdminServer = new HSAdminServer(aggLogDelService);
hsAdminServer = new HSAdminServer(aggLogDelService, jobHistoryService);
addService(jobHistoryService);
addService(clientService);
addService(aggLogDelService);

View File

@ -60,8 +60,12 @@ public class HSAdmin extends Configured implements Tool {
.println("Usage: mapred hsadmin [-refreshSuperUserGroupsConfiguration]");
} else if ("-refreshAdminAcls".equals(cmd)) {
System.err.println("Usage: mapred hsadmin [-refreshAdminAcls]");
} else if ("-refreshJobRetentionSettings".equals(cmd)) {
System.err
.println("Usage: mapred hsadmin [-refreshJobRetentionSettings]");
} else if ("-refreshLogRetentionSettings".equals(cmd)) {
System.err.println("Usage: mapred hsadmin [-refreshLogRetentionSettings]");
System.err
.println("Usage: mapred hsadmin [-refreshLogRetentionSettings]");
} else if ("-getGroups".equals(cmd)) {
System.err.println("Usage: mapred hsadmin" + " [-getGroups [username]]");
} else {
@ -69,6 +73,7 @@ public class HSAdmin extends Configured implements Tool {
System.err.println(" [-refreshUserToGroupsMappings]");
System.err.println(" [-refreshSuperUserGroupsConfiguration]");
System.err.println(" [-refreshAdminAcls]");
System.err.println(" [-refreshJobRetentionSettings]");
System.err.println(" [-refreshLogRetentionSettings]");
System.err.println(" [-getGroups [username]]");
System.err.println(" [-help [cmd]]");
@ -84,6 +89,8 @@ public class HSAdmin extends Configured implements Tool {
+ " [-refreshUserToGroupsMappings]"
+ " [-refreshSuperUserGroupsConfiguration]"
+ " [-refreshAdminAcls]"
+ " [-refreshLogRetentionSettings]"
+ " [-refreshJobRetentionSettings]"
+ " [-getGroups [username]]" + " [-help [cmd]]\n";
String refreshUserToGroupsMappings = "-refreshUserToGroupsMappings: Refresh user-to-groups mappings\n";
@ -92,8 +99,13 @@ public class HSAdmin extends Configured implements Tool {
String refreshAdminAcls = "-refreshAdminAcls: Refresh acls for administration of Job history server\n";
String refreshLogRetentionSettings = "-refreshLogRetentionSettings: Refresh 'log retention time' and 'log retention check interval' \n";
String refreshJobRetentionSettings = "-refreshJobRetentionSettings:" +
"Refresh job history period,job cleaner settings\n";
String refreshLogRetentionSettings = "-refreshLogRetentionSettings:" +
"Refresh log retention period and log retention check interval\n";
String getGroups = "-getGroups [username]: Get the groups which given user belongs to\n";
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n"
@ -107,6 +119,8 @@ public class HSAdmin extends Configured implements Tool {
System.out.println(refreshSuperUserGroupsConfiguration);
} else if ("refreshAdminAcls".equals(cmd)) {
System.out.println(refreshAdminAcls);
} else if ("refreshJobRetentionSettings".equals(cmd)) {
System.out.println(refreshJobRetentionSettings);
} else if ("refreshLogRetentionSettings".equals(cmd)) {
System.out.println(refreshLogRetentionSettings);
} else if ("getGroups".equals(cmd)) {
@ -116,6 +130,7 @@ public class HSAdmin extends Configured implements Tool {
System.out.println(refreshUserToGroupsMappings);
System.out.println(refreshSuperUserGroupsConfiguration);
System.out.println(refreshAdminAcls);
System.out.println(refreshJobRetentionSettings);
System.out.println(refreshLogRetentionSettings);
System.out.println(getGroups);
System.out.println(help);
@ -201,11 +216,27 @@ public class HSAdmin extends Configured implements Tool {
HSAdminRefreshProtocol refreshProtocol = HSProxies.createProxy(conf,
address, HSAdminRefreshProtocol.class,
UserGroupInformation.getCurrentUser());
// Refresh the user-to-groups mappings
refreshProtocol.refreshAdminAcls();
return 0;
}
private int refreshJobRetentionSettings() throws IOException {
// Refresh job retention settings
Configuration conf = getConf();
InetSocketAddress address = conf.getSocketAddr(
JHAdminConfig.JHS_ADMIN_ADDRESS,
JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);
HSAdminRefreshProtocol refreshProtocol = HSProxies.createProxy(conf,
address, HSAdminRefreshProtocol.class,
UserGroupInformation.getCurrentUser());
refreshProtocol.refreshJobRetentionSettings();
return 0;
}
private int refreshLogRetentionSettings() throws IOException {
// Refresh log retention settings
Configuration conf = getConf();
@ -214,14 +245,14 @@ public class HSAdmin extends Configured implements Tool {
JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);
HSAdminRefreshProtocol refreshProtocol = HSProxies
.createProxy(conf, address, HSAdminRefreshProtocol.class,
UserGroupInformation.getCurrentUser());
HSAdminRefreshProtocol refreshProtocol = HSProxies.createProxy(conf,
address, HSAdminRefreshProtocol.class,
UserGroupInformation.getCurrentUser());
refreshProtocol.refreshLogRetentionSettings();
return 0;
}
@Override
public int run(String[] args) throws Exception {
if (args.length < 1) {
@ -236,6 +267,7 @@ public class HSAdmin extends Configured implements Tool {
if ("-refreshUserToGroupsMappings".equals(cmd)
|| "-refreshSuperUserGroupsConfiguration".equals(cmd)
|| "-refreshAdminAcls".equals(cmd)
|| "-refreshJobRetentionSettings".equals(cmd)
|| "-refreshLogRetentionSettings".equals(cmd)) {
if (args.length != 1) {
printUsage(cmd);
@ -250,6 +282,8 @@ public class HSAdmin extends Configured implements Tool {
exitCode = refreshSuperUserGroupsConfiguration();
} else if ("-refreshAdminAcls".equals(cmd)) {
exitCode = refreshAdminAcls();
} else if ("-refreshJobRetentionSettings".equals(cmd)) {
exitCode = refreshJobRetentionSettings();
} else if ("-refreshLogRetentionSettings".equals(cmd)) {
exitCode = refreshLogRetentionSettings();
} else if ("-getGroups".equals(cmd)) {

View File

@ -39,6 +39,13 @@ public interface HSAdminRefreshProtocol {
* @throws IOException
*/
public void refreshAdminAcls() throws IOException;
/**
* Refresh job retention settings.
*
* @throws IOException
*/
public void refreshJobRetentionSettings() throws IOException;
/**
* Refresh log retention settings.

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshJobRetentionSettingsRequestProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto;
import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol;
@ -42,12 +43,18 @@ public class HSAdminRefreshProtocolClientSideTranslatorPB implements
private final HSAdminRefreshProtocolPB rpcProxy;
private final static RefreshAdminAclsRequestProto VOID_REFRESH_ADMIN_ACLS_REQUEST = RefreshAdminAclsRequestProto
private final static RefreshAdminAclsRequestProto
VOID_REFRESH_ADMIN_ACLS_REQUEST = RefreshAdminAclsRequestProto
.newBuilder().build();
private final static RefreshLogRetentionSettingsRequestProto VOID_REFRESH_LOG_RETENTION_SETTINGS_REQUEST = RefreshLogRetentionSettingsRequestProto
.newBuilder().build();
private final static RefreshJobRetentionSettingsRequestProto
VOID_REFRESH_JOB_RETENTION_SETTINGS_REQUEST =
RefreshJobRetentionSettingsRequestProto.newBuilder().build();
private final static RefreshLogRetentionSettingsRequestProto
VOID_REFRESH_LOG_RETENTION_SETTINGS_REQUEST =
RefreshLogRetentionSettingsRequestProto.newBuilder().build();
public HSAdminRefreshProtocolClientSideTranslatorPB(
HSAdminRefreshProtocolPB rpcProxy) {
this.rpcProxy = rpcProxy;
@ -68,6 +75,16 @@ public class HSAdminRefreshProtocolClientSideTranslatorPB implements
}
}
@Override
public void refreshJobRetentionSettings() throws IOException {
try {
rpcProxy.refreshJobRetentionSettings(NULL_CONTROLLER,
VOID_REFRESH_JOB_RETENTION_SETTINGS_REQUEST);
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
}
@Override
public void refreshLogRetentionSettings() throws IOException {
try {
@ -77,7 +94,7 @@ public class HSAdminRefreshProtocolClientSideTranslatorPB implements
throw ProtobufHelper.getRemoteException(se);
}
}
@Override
public boolean isMethodSupported(String methodName) throws IOException {
return RpcClientUtil.isMethodSupported(rpcProxy,

View File

@ -23,6 +23,8 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsResponseProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshJobRetentionSettingsRequestProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshJobRetentionSettingsResponseProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsResponseProto;
import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol;
@ -36,10 +38,17 @@ public class HSAdminRefreshProtocolServerSideTranslatorPB implements
private final HSAdminRefreshProtocol impl;
private final static RefreshAdminAclsResponseProto VOID_REFRESH_ADMIN_ACLS_RESPONSE = RefreshAdminAclsResponseProto
.newBuilder().build();
private final static RefreshLogRetentionSettingsResponseProto VOID_REFRESH_LOG_RETENTION_SETTINGS_RESPONSE = RefreshLogRetentionSettingsResponseProto
private final static RefreshAdminAclsResponseProto
VOID_REFRESH_ADMIN_ACLS_RESPONSE = RefreshAdminAclsResponseProto
.newBuilder().build();
private final static RefreshJobRetentionSettingsResponseProto
VOID_REFRESH_JOB_RETENTION_SETTINGS_RESPONSE =
RefreshJobRetentionSettingsResponseProto.newBuilder().build();
private final static RefreshLogRetentionSettingsResponseProto
VOID_REFRESH_LOG_RETENTION_SETTINGS_RESPONSE =
RefreshLogRetentionSettingsResponseProto.newBuilder().build();
public HSAdminRefreshProtocolServerSideTranslatorPB(
HSAdminRefreshProtocol impl) {
@ -58,9 +67,23 @@ public class HSAdminRefreshProtocolServerSideTranslatorPB implements
return VOID_REFRESH_ADMIN_ACLS_RESPONSE;
}
@Override
public RefreshJobRetentionSettingsResponseProto refreshJobRetentionSettings(
RpcController controller,
RefreshJobRetentionSettingsRequestProto request)
throws ServiceException {
try {
impl.refreshJobRetentionSettings();
} catch (IOException e) {
throw new ServiceException(e);
}
return VOID_REFRESH_JOB_RETENTION_SETTINGS_RESPONSE;
}
@Override
public RefreshLogRetentionSettingsResponseProto refreshLogRetentionSettings(
RpcController controller, RefreshLogRetentionSettingsRequestProto request)
RpcController controller,
RefreshLogRetentionSettingsRequestProto request)
throws ServiceException {
try {
impl.refreshLogRetentionSettings();

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger;
import org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger.AuditConstants;
import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.HSAdminRefreshProtocolService;
import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminProtocol;
import org.apache.hadoop.mapreduce.v2.hs.protocolPB.HSAdminRefreshProtocolPB;
@ -62,10 +63,13 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
protected RPC.Server clientRpcServer;
protected InetSocketAddress clientRpcAddress;
private static final String HISTORY_ADMIN_SERVER = "HSAdminServer";
public HSAdminServer(AggregatedLogDeletionService aggLogDelService) {
private JobHistory jobHistoryService = null;
public HSAdminServer(AggregatedLogDeletionService aggLogDelService,
JobHistory jobHistoryService) {
super(HSAdminServer.class.getName());
this.aggLogDelService = aggLogDelService;
this.jobHistoryService = jobHistoryService;
}
@Override
@ -100,7 +104,8 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
.setPort(clientRpcAddress.getPort()).setVerbose(false).build();
addProtocol(conf, GetUserMappingsProtocolPB.class, getUserMappingService);
addProtocol(conf, HSAdminRefreshProtocolPB.class, refreshHSAdminProtocolService);
addProtocol(conf, HSAdminRefreshProtocolPB.class,
refreshHSAdminProtocolService);
adminAcl = new AccessControlList(conf.get(JHAdminConfig.JHS_ADMIN_ACL,
JHAdminConfig.DEFAULT_JHS_ADMIN_ACL));
@ -196,7 +201,7 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
HSAuditLogger.logSuccess(user.getShortUserName(), "refreshAdminAcls",
HISTORY_ADMIN_SERVER);
}
@Override
public void refreshLogRetentionSettings() throws IOException {
UserGroupInformation user = checkAcls("refreshLogRetentionSettings");
@ -206,4 +211,14 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
HSAuditLogger.logSuccess(user.getShortUserName(),
"refreshLogRetentionSettings", "HSAdminServer");
}
@Override
public void refreshJobRetentionSettings() throws IOException {
UserGroupInformation user = checkAcls("refreshJobRetentionSettings");
jobHistoryService.refreshJobRetentionSettings();
HSAuditLogger.logSuccess(user.getShortUserName(),
"refreshJobRetentionSettings", HISTORY_ADMIN_SERVER);
}
}

View File

@ -34,6 +34,18 @@ message RefreshAdminAclsResponseProto {
}
/**
* refresh job retention settings request.
*/
message RefreshJobRetentionSettingsRequestProto {
}
/**
* Response for refresh job retention.
*/
message RefreshJobRetentionSettingsResponseProto {
}
/*
* refresh log retention request.
*/
message RefreshLogRetentionSettingsRequestProto {
@ -54,6 +66,13 @@ service HSAdminRefreshProtocolService {
*/
rpc refreshAdminAcls(RefreshAdminAclsRequestProto)
returns(RefreshAdminAclsResponseProto);
/**
* Refresh job retention.
*/
rpc refreshJobRetentionSettings(RefreshJobRetentionSettingsRequestProto)
returns(RefreshJobRetentionSettingsResponseProto);
/**
* Refresh log retention
*/

View File

@ -0,0 +1,156 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobListCache;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
public class TestJobHistory {
JobHistory jobHistory = null;
@Test
public void testRefreshJobRetentionSettings() throws IOException,
InterruptedException {
String root = "mockfs://foo/";
String historyDoneDir = root + "mapred/history/done";
long now = System.currentTimeMillis();
long someTimeYesterday = now - (25l * 3600 * 1000);
long timeBefore200Secs = now - (200l * 1000);
// Get yesterday's date in YY/MM/DD format
String timestampComponent = JobHistoryUtils
.timestampDirectoryComponent(someTimeYesterday);
// Create a folder under yesterday's done dir
Path donePathYesterday = new Path(historyDoneDir, timestampComponent + "/"
+ "000000");
FileStatus dirCreatedYesterdayStatus = new FileStatus(0, true, 0, 0,
someTimeYesterday, donePathYesterday);
// Get today's date in YY/MM/DD format
timestampComponent = JobHistoryUtils
.timestampDirectoryComponent(timeBefore200Secs);
// Create a folder under today's done dir
Path donePathToday = new Path(historyDoneDir, timestampComponent + "/"
+ "000000");
FileStatus dirCreatedTodayStatus = new FileStatus(0, true, 0, 0,
timeBefore200Secs, donePathToday);
// Create a jhist file with yesterday's timestamp under yesterday's done dir
Path fileUnderYesterdayDir = new Path(donePathYesterday.toString(),
"job_1372363578825_0015-" + someTimeYesterday + "-user-Sleep+job-"
+ someTimeYesterday + "-1-1-SUCCEEDED-default.jhist");
FileStatus fileUnderYesterdayDirStatus = new FileStatus(10, false, 0, 0,
someTimeYesterday, fileUnderYesterdayDir);
// Create a jhist file with today's timestamp under today's done dir
Path fileUnderTodayDir = new Path(donePathYesterday.toString(),
"job_1372363578825_0016-" + timeBefore200Secs + "-user-Sleep+job-"
+ timeBefore200Secs + "-1-1-SUCCEEDED-default.jhist");
FileStatus fileUnderTodayDirStatus = new FileStatus(10, false, 0, 0,
timeBefore200Secs, fileUnderTodayDir);
HistoryFileManager historyManager = spy(new HistoryFileManager());
jobHistory = spy(new JobHistory());
List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
fileStatusList.add(dirCreatedYesterdayStatus);
fileStatusList.add(dirCreatedTodayStatus);
// Make the initial delay of history job cleaner as 4 secs
doReturn(4).when(jobHistory).getInitDelaySecs();
doReturn(historyManager).when(jobHistory).createHistoryFileManager();
List<FileStatus> list1 = new LinkedList<FileStatus>();
list1.add(fileUnderYesterdayDirStatus);
doReturn(list1).when(historyManager).scanDirectoryForHistoryFiles(
eq(donePathYesterday), any(FileContext.class));
List<FileStatus> list2 = new LinkedList<FileStatus>();
list2.add(fileUnderTodayDirStatus);
doReturn(list2).when(historyManager).scanDirectoryForHistoryFiles(
eq(donePathToday), any(FileContext.class));
doReturn(fileStatusList).when(historyManager).findTimestampedDirectories();
doReturn(true).when(historyManager).deleteDir(any(FileStatus.class));
JobListCache jobListCache = mock(JobListCache.class);
HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
doReturn(jobListCache).when(historyManager).createJobListCache();
when(jobListCache.get(any(JobId.class))).thenReturn(fileInfo);
doNothing().when(fileInfo).delete();
// Set job retention time to 24 hrs and cleaner interval to 2 secs
Configuration conf = new Configuration();
conf.setLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, 24l * 3600 * 1000);
conf.setLong(JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, 2 * 1000);
jobHistory.init(conf);
jobHistory.start();
assertEquals(2 * 1000l, jobHistory.getCleanerInterval());
// Only yesterday's jhist file should get deleted
verify(fileInfo, timeout(20000).times(1)).delete();
fileStatusList.remove(dirCreatedYesterdayStatus);
// Now reset job retention time to 10 secs
conf.setLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, 10 * 1000);
// Set cleaner interval to 1 sec
conf.setLong(JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, 1 * 1000);
doReturn(conf).when(jobHistory).createConf();
// Do refresh job retention settings
jobHistory.refreshJobRetentionSettings();
// Cleaner interval should be updated
assertEquals(1 * 1000l, jobHistory.getCleanerInterval());
// Today's jhist file will also be deleted now since it falls below the
// retention threshold
verify(fileInfo, timeout(20000).times(2)).delete();
}
@After
public void cleanUp() {
if (jobHistory != null) {
jobHistory.stop();
}
}
}

View File

@ -28,6 +28,7 @@ import java.util.List;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
import org.apache.hadoop.mapreduce.v2.hs.client.HSAdmin;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.security.GroupMappingServiceProvider;
@ -49,6 +50,7 @@ public class TestHSAdminServer {
private HSAdmin hsAdminClient = null;
Configuration conf = null;
private static long groupRefreshTimeoutSec = 1;
JobHistory jobHistoryService = null;
AggregatedLogDeletionService alds = null;
public static class MockUnixGroupsMapping implements
@ -85,9 +87,11 @@ public class TestHSAdminServer {
GroupMappingServiceProvider.class);
conf.setLong("hadoop.security.groups.cache.secs", groupRefreshTimeoutSec);
Groups.getUserToGroupsMappingService(conf);
jobHistoryService = mock(JobHistory.class);
alds = mock(AggregatedLogDeletionService.class);
hsAdminServer = new HSAdminServer(alds) {
hsAdminServer = new HSAdminServer(alds, jobHistoryService) {
@Override
protected Configuration createConf() {
return conf;
@ -236,13 +240,21 @@ public class TestHSAdminServer {
}
assertTrue(th instanceof RemoteException);
}
@Test
public void testRefreshLogRetentionSettings() throws Exception {
String[] args = new String[1];
args[0] = "-refreshLogRetentionSettings";
hsAdminClient.run(args);
verify(alds).refreshLogRetentionSettings();
String[] args = new String[1];
args[0] = "-refreshLogRetentionSettings";
hsAdminClient.run(args);
verify(alds).refreshLogRetentionSettings();
}
@Test
public void testRefreshJobRetentionSettings() throws Exception {
String[] args = new String[1];
args[0] = "-refreshJobRetentionSettings";
hsAdminClient.run(args);
verify(jobHistoryService).refreshJobRetentionSettings();
}
@After