svn merge -c 1506226 FIXES: MAPREDUCE-5356. Ability to refresh aggregated log retention period and check interval. Contributed by Ashwin Shankar
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1506230 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5ff0774e8a
commit
d6be90bccf
|
@ -9,6 +9,9 @@ Release 2.3.0 - UNRELEASED
|
|||
MAPREDUCE-5265. History server admin service to refresh user and superuser
|
||||
group mappings (Ashwin Shankar via jlowe)
|
||||
|
||||
MAPREDUCE-5356. Ability to refresh aggregated log retention period and
|
||||
check interval (Ashwin Shankar via jlowe)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
|
|
@ -83,7 +83,7 @@ public class JobHistoryServer extends CompositeService {
|
|||
clientService = new HistoryClientService(historyContext,
|
||||
this.jhsDTSecretManager);
|
||||
aggLogDelService = new AggregatedLogDeletionService();
|
||||
hsAdminServer = new HSAdminServer();
|
||||
hsAdminServer = new HSAdminServer(aggLogDelService);
|
||||
addService(jobHistoryService);
|
||||
addService(clientService);
|
||||
addService(aggLogDelService);
|
||||
|
|
|
@ -60,6 +60,8 @@ public class HSAdmin extends Configured implements Tool {
|
|||
.println("Usage: mapred hsadmin [-refreshSuperUserGroupsConfiguration]");
|
||||
} else if ("-refreshAdminAcls".equals(cmd)) {
|
||||
System.err.println("Usage: mapred hsadmin [-refreshAdminAcls]");
|
||||
} else if ("-refreshLogRetentionSettings".equals(cmd)) {
|
||||
System.err.println("Usage: mapred hsadmin [-refreshLogRetentionSettings]");
|
||||
} else if ("-getGroups".equals(cmd)) {
|
||||
System.err.println("Usage: mapred hsadmin" + " [-getGroups [username]]");
|
||||
} else {
|
||||
|
@ -67,6 +69,7 @@ public class HSAdmin extends Configured implements Tool {
|
|||
System.err.println(" [-refreshUserToGroupsMappings]");
|
||||
System.err.println(" [-refreshSuperUserGroupsConfiguration]");
|
||||
System.err.println(" [-refreshAdminAcls]");
|
||||
System.err.println(" [-refreshLogRetentionSettings]");
|
||||
System.err.println(" [-getGroups [username]]");
|
||||
System.err.println(" [-help [cmd]]");
|
||||
System.err.println();
|
||||
|
@ -89,6 +92,8 @@ public class HSAdmin extends Configured implements Tool {
|
|||
|
||||
String refreshAdminAcls = "-refreshAdminAcls: Refresh acls for administration of Job history server\n";
|
||||
|
||||
String refreshLogRetentionSettings = "-refreshLogRetentionSettings: Refresh 'log retention time' and 'log retention check interval' \n";
|
||||
|
||||
String getGroups = "-getGroups [username]: Get the groups which given user belongs to\n";
|
||||
|
||||
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n"
|
||||
|
@ -102,6 +107,8 @@ public class HSAdmin extends Configured implements Tool {
|
|||
System.out.println(refreshSuperUserGroupsConfiguration);
|
||||
} else if ("refreshAdminAcls".equals(cmd)) {
|
||||
System.out.println(refreshAdminAcls);
|
||||
} else if ("refreshLogRetentionSettings".equals(cmd)) {
|
||||
System.out.println(refreshLogRetentionSettings);
|
||||
} else if ("getGroups".equals(cmd)) {
|
||||
System.out.println(getGroups);
|
||||
} else {
|
||||
|
@ -109,6 +116,7 @@ public class HSAdmin extends Configured implements Tool {
|
|||
System.out.println(refreshUserToGroupsMappings);
|
||||
System.out.println(refreshSuperUserGroupsConfiguration);
|
||||
System.out.println(refreshAdminAcls);
|
||||
System.out.println(refreshLogRetentionSettings);
|
||||
System.out.println(getGroups);
|
||||
System.out.println(help);
|
||||
System.out.println();
|
||||
|
@ -198,6 +206,22 @@ public class HSAdmin extends Configured implements Tool {
|
|||
return 0;
|
||||
}
|
||||
|
||||
private int refreshLogRetentionSettings() throws IOException {
|
||||
// Refresh log retention settings
|
||||
Configuration conf = getConf();
|
||||
InetSocketAddress address = conf.getSocketAddr(
|
||||
JHAdminConfig.JHS_ADMIN_ADDRESS,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);
|
||||
|
||||
HSAdminRefreshProtocol refreshProtocol = HSProxies
|
||||
.createProxy(conf, address, HSAdminRefreshProtocol.class,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
|
||||
refreshProtocol.refreshLogRetentionSettings();
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
if (args.length < 1) {
|
||||
|
@ -211,7 +235,8 @@ public class HSAdmin extends Configured implements Tool {
|
|||
|
||||
if ("-refreshUserToGroupsMappings".equals(cmd)
|
||||
|| "-refreshSuperUserGroupsConfiguration".equals(cmd)
|
||||
|| "-refreshAdminAcls".equals(cmd)) {
|
||||
|| "-refreshAdminAcls".equals(cmd)
|
||||
|| "-refreshLogRetentionSettings".equals(cmd)) {
|
||||
if (args.length != 1) {
|
||||
printUsage(cmd);
|
||||
return exitCode;
|
||||
|
@ -225,6 +250,8 @@ public class HSAdmin extends Configured implements Tool {
|
|||
exitCode = refreshSuperUserGroupsConfiguration();
|
||||
} else if ("-refreshAdminAcls".equals(cmd)) {
|
||||
exitCode = refreshAdminAcls();
|
||||
} else if ("-refreshLogRetentionSettings".equals(cmd)) {
|
||||
exitCode = refreshLogRetentionSettings();
|
||||
} else if ("-getGroups".equals(cmd)) {
|
||||
String[] usernames = Arrays.copyOfRange(args, i, args.length);
|
||||
exitCode = getGroups(usernames);
|
||||
|
|
|
@ -40,4 +40,11 @@ public interface HSAdminRefreshProtocol {
|
|||
*/
|
||||
public void refreshAdminAcls() throws IOException;
|
||||
|
||||
/**
|
||||
* Refresh log retention settings.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public void refreshLogRetentionSettings() throws IOException;
|
||||
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.ipc.ProtocolMetaInterface;
|
|||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RpcClientUtil;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
|
@ -43,7 +44,10 @@ public class HSAdminRefreshProtocolClientSideTranslatorPB implements
|
|||
|
||||
private final static RefreshAdminAclsRequestProto VOID_REFRESH_ADMIN_ACLS_REQUEST = RefreshAdminAclsRequestProto
|
||||
.newBuilder().build();
|
||||
|
||||
|
||||
private final static RefreshLogRetentionSettingsRequestProto VOID_REFRESH_LOG_RETENTION_SETTINGS_REQUEST = RefreshLogRetentionSettingsRequestProto
|
||||
.newBuilder().build();
|
||||
|
||||
public HSAdminRefreshProtocolClientSideTranslatorPB(
|
||||
HSAdminRefreshProtocolPB rpcProxy) {
|
||||
this.rpcProxy = rpcProxy;
|
||||
|
@ -64,6 +68,16 @@ public class HSAdminRefreshProtocolClientSideTranslatorPB implements
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshLogRetentionSettings() throws IOException {
|
||||
try {
|
||||
rpcProxy.refreshLogRetentionSettings(NULL_CONTROLLER,
|
||||
VOID_REFRESH_LOG_RETENTION_SETTINGS_REQUEST);
|
||||
} catch (ServiceException se) {
|
||||
throw ProtobufHelper.getRemoteException(se);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isMethodSupported(String methodName) throws IOException {
|
||||
return RpcClientUtil.isMethodSupported(rpcProxy,
|
||||
|
|
|
@ -23,6 +23,8 @@ import java.io.IOException;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsResponseProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsResponseProto;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
|
@ -36,6 +38,8 @@ public class HSAdminRefreshProtocolServerSideTranslatorPB implements
|
|||
|
||||
private final static RefreshAdminAclsResponseProto VOID_REFRESH_ADMIN_ACLS_RESPONSE = RefreshAdminAclsResponseProto
|
||||
.newBuilder().build();
|
||||
private final static RefreshLogRetentionSettingsResponseProto VOID_REFRESH_LOG_RETENTION_SETTINGS_RESPONSE = RefreshLogRetentionSettingsResponseProto
|
||||
.newBuilder().build();
|
||||
|
||||
public HSAdminRefreshProtocolServerSideTranslatorPB(
|
||||
HSAdminRefreshProtocol impl) {
|
||||
|
@ -54,4 +58,15 @@ public class HSAdminRefreshProtocolServerSideTranslatorPB implements
|
|||
return VOID_REFRESH_ADMIN_ACLS_RESPONSE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshLogRetentionSettingsResponseProto refreshLogRetentionSettings(
|
||||
RpcController controller, RefreshLogRetentionSettingsRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
impl.refreshLogRetentionSettings();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
return VOID_REFRESH_LOG_RETENTION_SETTINGS_RESPONSE;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.security.Groups;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
|
||||
import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshUserMappingsProtocolService;
|
||||
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
|
||||
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB;
|
||||
|
@ -55,14 +56,16 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(HSAdminServer.class);
|
||||
private AccessControlList adminAcl;
|
||||
private AggregatedLogDeletionService aggLogDelService = null;
|
||||
|
||||
/** The RPC server that listens to requests from clients */
|
||||
protected RPC.Server clientRpcServer;
|
||||
protected InetSocketAddress clientRpcAddress;
|
||||
private static final String HISTORY_ADMIN_SERVER = "HSAdminServer";
|
||||
|
||||
public HSAdminServer() {
|
||||
|
||||
public HSAdminServer(AggregatedLogDeletionService aggLogDelService) {
|
||||
super(HSAdminServer.class.getName());
|
||||
this.aggLogDelService = aggLogDelService;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -101,6 +104,7 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
|
|||
|
||||
adminAcl = new AccessControlList(conf.get(JHAdminConfig.JHS_ADMIN_ACL,
|
||||
JHAdminConfig.DEFAULT_JHS_ADMIN_ACL));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -192,5 +196,14 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol {
|
|||
HSAuditLogger.logSuccess(user.getShortUserName(), "refreshAdminAcls",
|
||||
HISTORY_ADMIN_SERVER);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshLogRetentionSettings() throws IOException {
|
||||
UserGroupInformation user = checkAcls("refreshLogRetentionSettings");
|
||||
|
||||
aggLogDelService.refreshLogRetentionSettings();
|
||||
|
||||
HSAuditLogger.logSuccess(user.getShortUserName(),
|
||||
"refreshLogRetentionSettings", "HSAdminServer");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,6 +33,18 @@ message RefreshAdminAclsRequestProto {
|
|||
message RefreshAdminAclsResponseProto {
|
||||
}
|
||||
|
||||
/**
|
||||
* refresh log retention request.
|
||||
*/
|
||||
message RefreshLogRetentionSettingsRequestProto {
|
||||
}
|
||||
|
||||
/**
|
||||
* Response for refresh log retention.
|
||||
*/
|
||||
message RefreshLogRetentionSettingsResponseProto {
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh Protocols implemented by the History server
|
||||
*/
|
||||
|
@ -42,4 +54,9 @@ service HSAdminRefreshProtocolService {
|
|||
*/
|
||||
rpc refreshAdminAcls(RefreshAdminAclsRequestProto)
|
||||
returns(RefreshAdminAclsResponseProto);
|
||||
/**
|
||||
* Refresh log retention
|
||||
*/
|
||||
rpc refreshLogRetentionSettings(RefreshLogRetentionSettingsRequestProto)
|
||||
returns(RefreshLogRetentionSettingsResponseProto);
|
||||
}
|
|
@ -39,14 +39,17 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
|
||||
|
||||
public class TestHSAdminServer {
|
||||
private HSAdminServer hsAdminServer = null;
|
||||
private HSAdmin hsAdminClient = null;
|
||||
Configuration conf = null;
|
||||
private static long groupRefreshTimeoutSec = 1;
|
||||
AggregatedLogDeletionService alds = null;
|
||||
|
||||
public static class MockUnixGroupsMapping implements
|
||||
GroupMappingServiceProvider {
|
||||
|
@ -82,7 +85,9 @@ public class TestHSAdminServer {
|
|||
GroupMappingServiceProvider.class);
|
||||
conf.setLong("hadoop.security.groups.cache.secs", groupRefreshTimeoutSec);
|
||||
Groups.getUserToGroupsMappingService(conf);
|
||||
hsAdminServer = new HSAdminServer() {
|
||||
alds = mock(AggregatedLogDeletionService.class);
|
||||
|
||||
hsAdminServer = new HSAdminServer(alds) {
|
||||
@Override
|
||||
protected Configuration createConf() {
|
||||
return conf;
|
||||
|
@ -231,6 +236,14 @@ public class TestHSAdminServer {
|
|||
}
|
||||
assertTrue(th instanceof RemoteException);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshLogRetentionSettings() throws Exception {
|
||||
String[] args = new String[1];
|
||||
args[0] = "-refreshLogRetentionSettings";
|
||||
hsAdminClient.run(args);
|
||||
verify(alds).refreshLogRetentionSettings();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanUp() {
|
||||
|
|
|
@ -41,6 +41,7 @@ public class AggregatedLogDeletionService extends AbstractService {
|
|||
private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class);
|
||||
|
||||
private Timer timer = null;
|
||||
private long checkIntervalMsecs;
|
||||
|
||||
static class LogDeletionTask extends TimerTask {
|
||||
private Configuration conf;
|
||||
|
@ -133,37 +134,71 @@ public class AggregatedLogDeletionService extends AbstractService {
|
|||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
Configuration conf = getConfig();
|
||||
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
|
||||
//Log aggregation is not enabled so don't bother
|
||||
return;
|
||||
}
|
||||
long retentionSecs = conf.getLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS);
|
||||
if(retentionSecs < 0) {
|
||||
LOG.info("Log Aggregation deletion is disabled because retention is" +
|
||||
" too small (" + retentionSecs + ")");
|
||||
return;
|
||||
}
|
||||
long checkIntervalMsecs = 1000 * conf.getLong(
|
||||
YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS);
|
||||
if (checkIntervalMsecs <= 0) {
|
||||
// when unspecified compute check interval as 1/10th of retention
|
||||
checkIntervalMsecs = (retentionSecs * 1000) / 10;
|
||||
}
|
||||
TimerTask task = new LogDeletionTask(conf, retentionSecs);
|
||||
timer = new Timer();
|
||||
timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
|
||||
scheduleLogDeletionTask();
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
if(timer != null) {
|
||||
timer.cancel();
|
||||
}
|
||||
stopTimer();
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
private void setLogAggCheckIntervalMsecs(long retentionSecs) {
|
||||
Configuration conf = getConfig();
|
||||
checkIntervalMsecs = 1000 * conf
|
||||
.getLong(
|
||||
YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS);
|
||||
if (checkIntervalMsecs <= 0) {
|
||||
// when unspecified compute check interval as 1/10th of retention
|
||||
checkIntervalMsecs = (retentionSecs * 1000) / 10;
|
||||
}
|
||||
}
|
||||
|
||||
public void refreshLogRetentionSettings() {
|
||||
if (getServiceState() == STATE.STARTED) {
|
||||
Configuration conf = createConf();
|
||||
setConfig(conf);
|
||||
stopTimer();
|
||||
scheduleLogDeletionTask();
|
||||
} else {
|
||||
LOG.warn("Failed to execute refreshLogRetentionSettings : Aggregated Log Deletion Service is not started");
|
||||
}
|
||||
}
|
||||
|
||||
private void scheduleLogDeletionTask() {
|
||||
Configuration conf = getConfig();
|
||||
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
|
||||
// Log aggregation is not enabled so don't bother
|
||||
return;
|
||||
}
|
||||
long retentionSecs = conf.getLong(
|
||||
YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS);
|
||||
if (retentionSecs < 0) {
|
||||
LOG.info("Log Aggregation deletion is disabled because retention is"
|
||||
+ " too small (" + retentionSecs + ")");
|
||||
return;
|
||||
}
|
||||
setLogAggCheckIntervalMsecs(retentionSecs);
|
||||
TimerTask task = new LogDeletionTask(conf, retentionSecs);
|
||||
timer = new Timer();
|
||||
timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
|
||||
}
|
||||
|
||||
private void stopTimer() {
|
||||
if (timer != null) {
|
||||
timer.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
public long getCheckIntervalMsecs() {
|
||||
return checkIntervalMsecs;
|
||||
}
|
||||
|
||||
protected Configuration createConf() {
|
||||
return new Configuration();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.security.AccessControlException;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.Assert;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
|
@ -128,6 +129,99 @@ public class TestAggregatedLogDeletionService {
|
|||
verify(mockFs).delete(app4Dir, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshLogRetentionSettings() throws IOException {
|
||||
long now = System.currentTimeMillis();
|
||||
//time before 2000 sec
|
||||
long before2000Secs = now - (2000 * 1000);
|
||||
//time before 50 sec
|
||||
long before50Secs = now - (50 * 1000);
|
||||
String root = "mockfs://foo/";
|
||||
String remoteRootLogDir = root + "tmp/logs";
|
||||
String suffix = "logs";
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
||||
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
|
||||
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
|
||||
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
|
||||
"1");
|
||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
|
||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
|
||||
|
||||
Path rootPath = new Path(root);
|
||||
FileSystem rootFs = rootPath.getFileSystem(conf);
|
||||
FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem();
|
||||
|
||||
Path remoteRootLogPath = new Path(remoteRootLogDir);
|
||||
|
||||
Path userDir = new Path(remoteRootLogPath, "me");
|
||||
FileStatus userDirStatus = new FileStatus(0, true, 0, 0, before50Secs,
|
||||
userDir);
|
||||
|
||||
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
|
||||
new FileStatus[] { userDirStatus });
|
||||
|
||||
Path userLogDir = new Path(userDir, suffix);
|
||||
|
||||
//Set time last modified of app1Dir directory and its files to before2000Secs
|
||||
Path app1Dir = new Path(userLogDir, "application_1_1");
|
||||
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
|
||||
app1Dir);
|
||||
|
||||
//Set time last modified of app1Dir directory and its files to before50Secs
|
||||
Path app2Dir = new Path(userLogDir, "application_1_2");
|
||||
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
|
||||
app2Dir);
|
||||
|
||||
when(mockFs.listStatus(userLogDir)).thenReturn(
|
||||
new FileStatus[] { app1DirStatus, app2DirStatus });
|
||||
|
||||
Path app1Log1 = new Path(app1Dir, "host1");
|
||||
FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs,
|
||||
app1Log1);
|
||||
|
||||
when(mockFs.listStatus(app1Dir)).thenReturn(
|
||||
new FileStatus[] { app1Log1Status });
|
||||
|
||||
Path app2Log1 = new Path(app2Dir, "host1");
|
||||
FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, before50Secs,
|
||||
app2Log1);
|
||||
|
||||
when(mockFs.listStatus(app2Dir)).thenReturn(
|
||||
new FileStatus[] { app2Log1Status });
|
||||
|
||||
AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
|
||||
@Override
|
||||
protected Configuration createConf() {
|
||||
return conf;
|
||||
}
|
||||
};
|
||||
|
||||
deletionSvc.init(conf);
|
||||
deletionSvc.start();
|
||||
|
||||
//app1Dir would be deleted since its done above log retention period
|
||||
verify(mockFs, timeout(10000)).delete(app1Dir, true);
|
||||
//app2Dir is not expected to be deleted since its below the threshold
|
||||
verify(mockFs, timeout(3000).times(0)).delete(app2Dir, true);
|
||||
|
||||
//Now,lets change the confs
|
||||
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "50");
|
||||
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
|
||||
"2");
|
||||
//We have not called refreshLogSettings,hence don't expect to see the changed conf values
|
||||
Assert.assertTrue(2000l != deletionSvc.getCheckIntervalMsecs());
|
||||
|
||||
//refresh the log settings
|
||||
deletionSvc.refreshLogRetentionSettings();
|
||||
|
||||
//Check interval time should reflect the new value
|
||||
Assert.assertTrue(2000l == deletionSvc.getCheckIntervalMsecs());
|
||||
//app2Dir should be deleted since it falls above the threshold
|
||||
verify(mockFs, timeout(10000)).delete(app2Dir, true);
|
||||
deletionSvc.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckInterval() throws Exception {
|
||||
long RETENTION_SECS = 10 * 24 * 3600;
|
||||
|
@ -176,7 +270,7 @@ public class TestAggregatedLogDeletionService {
|
|||
new AggregatedLogDeletionService();
|
||||
deletionSvc.init(conf);
|
||||
deletionSvc.start();
|
||||
|
||||
|
||||
verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class));
|
||||
verify(mockFs, never()).delete(app1Dir, true);
|
||||
|
||||
|
|
Loading…
Reference in New Issue