diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c94e6cdd26a..999db136899 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -9,6 +9,9 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-5265. History server admin service to refresh user and superuser group mappings (Ashwin Shankar via jlowe) + MAPREDUCE-5356. Ability to refresh aggregated log retention period and + check interval (Ashwin Shankar via jlowe) + IMPROVEMENTS OPTIMIZATIONS diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java index 391edbd7474..01d5b5d2ee0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java @@ -83,7 +83,7 @@ public class JobHistoryServer extends CompositeService { clientService = new HistoryClientService(historyContext, this.jhsDTSecretManager); aggLogDelService = new AggregatedLogDeletionService(); - hsAdminServer = new HSAdminServer(); + hsAdminServer = new HSAdminServer(aggLogDelService); addService(jobHistoryService); addService(clientService); addService(aggLogDelService); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/client/HSAdmin.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/client/HSAdmin.java index ae6f011c22a..dc3c89663ef 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/client/HSAdmin.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/client/HSAdmin.java @@ -60,6 +60,8 @@ public class HSAdmin extends Configured implements Tool { .println("Usage: mapred hsadmin [-refreshSuperUserGroupsConfiguration]"); } else if ("-refreshAdminAcls".equals(cmd)) { System.err.println("Usage: mapred hsadmin [-refreshAdminAcls]"); + } else if ("-refreshLogRetentionSettings".equals(cmd)) { + System.err.println("Usage: mapred hsadmin [-refreshLogRetentionSettings]"); } else if ("-getGroups".equals(cmd)) { System.err.println("Usage: mapred hsadmin" + " [-getGroups [username]]"); } else { @@ -67,6 +69,7 @@ public class HSAdmin extends Configured implements Tool { System.err.println(" [-refreshUserToGroupsMappings]"); System.err.println(" [-refreshSuperUserGroupsConfiguration]"); System.err.println(" [-refreshAdminAcls]"); + System.err.println(" [-refreshLogRetentionSettings]"); System.err.println(" [-getGroups [username]]"); System.err.println(" [-help [cmd]]"); System.err.println(); @@ -89,6 +92,8 @@ public class HSAdmin extends Configured implements Tool { String refreshAdminAcls = "-refreshAdminAcls: Refresh acls for administration of Job history server\n"; + String refreshLogRetentionSettings = "-refreshLogRetentionSettings: Refresh 'log retention time' and 'log retention check interval' \n"; + String getGroups = "-getGroups [username]: Get the groups which given user belongs to\n"; String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" @@ -102,6 +107,8 @@ public class HSAdmin extends Configured implements Tool { System.out.println(refreshSuperUserGroupsConfiguration); } else if ("refreshAdminAcls".equals(cmd)) { System.out.println(refreshAdminAcls); + } else if ("refreshLogRetentionSettings".equals(cmd)) { + System.out.println(refreshLogRetentionSettings); } else if ("getGroups".equals(cmd)) { System.out.println(getGroups); } else { @@ -109,6 +116,7 @@ public class HSAdmin extends Configured implements Tool { System.out.println(refreshUserToGroupsMappings); System.out.println(refreshSuperUserGroupsConfiguration); System.out.println(refreshAdminAcls); + System.out.println(refreshLogRetentionSettings); System.out.println(getGroups); System.out.println(help); System.out.println(); @@ -198,6 +206,22 @@ public class HSAdmin extends Configured implements Tool { return 0; } + private int refreshLogRetentionSettings() throws IOException { + // Refresh log retention settings + Configuration conf = getConf(); + InetSocketAddress address = conf.getSocketAddr( + JHAdminConfig.JHS_ADMIN_ADDRESS, + JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS, + JHAdminConfig.DEFAULT_JHS_ADMIN_PORT); + + HSAdminRefreshProtocol refreshProtocol = HSProxies + .createProxy(conf, address, HSAdminRefreshProtocol.class, + UserGroupInformation.getCurrentUser()); + + refreshProtocol.refreshLogRetentionSettings(); + return 0; + } + @Override public int run(String[] args) throws Exception { if (args.length < 1) { @@ -211,7 +235,8 @@ public class HSAdmin extends Configured implements Tool { if ("-refreshUserToGroupsMappings".equals(cmd) || "-refreshSuperUserGroupsConfiguration".equals(cmd) - || "-refreshAdminAcls".equals(cmd)) { + || "-refreshAdminAcls".equals(cmd) + || "-refreshLogRetentionSettings".equals(cmd)) { if (args.length != 1) { printUsage(cmd); return exitCode; @@ -225,6 +250,8 @@ public class HSAdmin extends Configured implements Tool { exitCode = refreshSuperUserGroupsConfiguration(); } else if ("-refreshAdminAcls".equals(cmd)) { exitCode = refreshAdminAcls(); + } else if ("-refreshLogRetentionSettings".equals(cmd)) { + exitCode = refreshLogRetentionSettings(); } else if ("-getGroups".equals(cmd)) { String[] usernames = Arrays.copyOfRange(args, i, args.length); exitCode = getGroups(usernames); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocol/HSAdminRefreshProtocol.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocol/HSAdminRefreshProtocol.java index d1e46a3a9d6..ed661d1bf3c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocol/HSAdminRefreshProtocol.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocol/HSAdminRefreshProtocol.java @@ -40,4 +40,11 @@ public interface HSAdminRefreshProtocol { */ public void refreshAdminAcls() throws IOException; + /** + * Refresh log retention settings. + * + * @throws IOException + */ + public void refreshLogRetentionSettings() throws IOException; + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolClientSideTranslatorPB.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolClientSideTranslatorPB.java index 7e6ac1595e6..e210b2b7d45 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolClientSideTranslatorPB.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolClientSideTranslatorPB.java @@ -27,6 +27,7 @@ import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto; +import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto; import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol; import com.google.protobuf.RpcController; @@ -43,7 +44,10 @@ public class HSAdminRefreshProtocolClientSideTranslatorPB implements private final static RefreshAdminAclsRequestProto VOID_REFRESH_ADMIN_ACLS_REQUEST = RefreshAdminAclsRequestProto .newBuilder().build(); - + + private final static RefreshLogRetentionSettingsRequestProto VOID_REFRESH_LOG_RETENTION_SETTINGS_REQUEST = RefreshLogRetentionSettingsRequestProto + .newBuilder().build(); + public HSAdminRefreshProtocolClientSideTranslatorPB( HSAdminRefreshProtocolPB rpcProxy) { this.rpcProxy = rpcProxy; @@ -64,6 +68,16 @@ public class HSAdminRefreshProtocolClientSideTranslatorPB implements } } + @Override + public void refreshLogRetentionSettings() throws IOException { + try { + rpcProxy.refreshLogRetentionSettings(NULL_CONTROLLER, + VOID_REFRESH_LOG_RETENTION_SETTINGS_REQUEST); + } catch (ServiceException se) { + throw ProtobufHelper.getRemoteException(se); + } + } + @Override public boolean isMethodSupported(String methodName) throws IOException { return RpcClientUtil.isMethodSupported(rpcProxy, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolServerSideTranslatorPB.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolServerSideTranslatorPB.java index 3d3661c3dd9..eaf395cb13a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolServerSideTranslatorPB.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolServerSideTranslatorPB.java @@ -23,6 +23,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsResponseProto; import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshAdminAclsRequestProto; +import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto; +import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsResponseProto; import org.apache.hadoop.mapreduce.v2.hs.protocol.HSAdminRefreshProtocol; import com.google.protobuf.RpcController; @@ -36,6 +38,8 @@ public class HSAdminRefreshProtocolServerSideTranslatorPB implements private final static RefreshAdminAclsResponseProto VOID_REFRESH_ADMIN_ACLS_RESPONSE = RefreshAdminAclsResponseProto .newBuilder().build(); + private final static RefreshLogRetentionSettingsResponseProto VOID_REFRESH_LOG_RETENTION_SETTINGS_RESPONSE = RefreshLogRetentionSettingsResponseProto + .newBuilder().build(); public HSAdminRefreshProtocolServerSideTranslatorPB( HSAdminRefreshProtocol impl) { @@ -54,4 +58,15 @@ public class HSAdminRefreshProtocolServerSideTranslatorPB implements return VOID_REFRESH_ADMIN_ACLS_RESPONSE; } + @Override + public RefreshLogRetentionSettingsResponseProto refreshLogRetentionSettings( + RpcController controller, RefreshLogRetentionSettingsRequestProto request) + throws ServiceException { + try { + impl.refreshLogRetentionSettings(); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_REFRESH_LOG_RETENTION_SETTINGS_RESPONSE; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java index b1d6b88b486..444a8617d4f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/server/HSAdminServer.java @@ -34,6 +34,7 @@ import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.ProxyUsers; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService; import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshUserMappingsProtocolService; import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB; import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB; @@ -55,14 +56,16 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol { private static final Log LOG = LogFactory.getLog(HSAdminServer.class); private AccessControlList adminAcl; + private AggregatedLogDeletionService aggLogDelService = null; /** The RPC server that listens to requests from clients */ protected RPC.Server clientRpcServer; protected InetSocketAddress clientRpcAddress; private static final String HISTORY_ADMIN_SERVER = "HSAdminServer"; - - public HSAdminServer() { + + public HSAdminServer(AggregatedLogDeletionService aggLogDelService) { super(HSAdminServer.class.getName()); + this.aggLogDelService = aggLogDelService; } @Override @@ -101,6 +104,7 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol { adminAcl = new AccessControlList(conf.get(JHAdminConfig.JHS_ADMIN_ACL, JHAdminConfig.DEFAULT_JHS_ADMIN_ACL)); + } @Override @@ -192,5 +196,14 @@ public class HSAdminServer extends AbstractService implements HSAdminProtocol { HSAuditLogger.logSuccess(user.getShortUserName(), "refreshAdminAcls", HISTORY_ADMIN_SERVER); } + + @Override + public void refreshLogRetentionSettings() throws IOException { + UserGroupInformation user = checkAcls("refreshLogRetentionSettings"); + aggLogDelService.refreshLogRetentionSettings(); + + HSAuditLogger.logSuccess(user.getShortUserName(), + "refreshLogRetentionSettings", "HSAdminServer"); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/proto/HSAdminRefreshProtocol.proto b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/proto/HSAdminRefreshProtocol.proto index e43fd75c86a..e9d2f306894 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/proto/HSAdminRefreshProtocol.proto +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/proto/HSAdminRefreshProtocol.proto @@ -33,6 +33,18 @@ message RefreshAdminAclsRequestProto { message RefreshAdminAclsResponseProto { } +/** + * refresh log retention request. + */ +message RefreshLogRetentionSettingsRequestProto { +} + +/** + * Response for refresh log retention. + */ +message RefreshLogRetentionSettingsResponseProto { +} + /** * Refresh Protocols implemented by the History server */ @@ -42,4 +54,9 @@ service HSAdminRefreshProtocolService { */ rpc refreshAdminAcls(RefreshAdminAclsRequestProto) returns(RefreshAdminAclsResponseProto); + /** + * Refresh log retention + */ + rpc refreshLogRetentionSettings(RefreshLogRetentionSettingsRequestProto) + returns(RefreshLogRetentionSettingsResponseProto); } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java index 8afd1b737a5..c530c194976 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/server/TestHSAdminServer.java @@ -39,14 +39,17 @@ import org.junit.Before; import org.junit.Test; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; import org.apache.hadoop.security.authorize.AuthorizationException; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService; public class TestHSAdminServer { private HSAdminServer hsAdminServer = null; private HSAdmin hsAdminClient = null; Configuration conf = null; private static long groupRefreshTimeoutSec = 1; + AggregatedLogDeletionService alds = null; public static class MockUnixGroupsMapping implements GroupMappingServiceProvider { @@ -82,7 +85,9 @@ public class TestHSAdminServer { GroupMappingServiceProvider.class); conf.setLong("hadoop.security.groups.cache.secs", groupRefreshTimeoutSec); Groups.getUserToGroupsMappingService(conf); - hsAdminServer = new HSAdminServer() { + alds = mock(AggregatedLogDeletionService.class); + + hsAdminServer = new HSAdminServer(alds) { @Override protected Configuration createConf() { return conf; @@ -231,6 +236,14 @@ public class TestHSAdminServer { } assertTrue(th instanceof RemoteException); } + + @Test + public void testRefreshLogRetentionSettings() throws Exception { + String[] args = new String[1]; + args[0] = "-refreshLogRetentionSettings"; + hsAdminClient.run(args); + verify(alds).refreshLogRetentionSettings(); + } @After public void cleanUp() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java index c7b3376e476..590cfe20c49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java @@ -41,6 +41,7 @@ public class AggregatedLogDeletionService extends AbstractService { private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class); private Timer timer = null; + private long checkIntervalMsecs; static class LogDeletionTask extends TimerTask { private Configuration conf; @@ -133,37 +134,71 @@ public class AggregatedLogDeletionService extends AbstractService { @Override protected void serviceStart() throws Exception { - Configuration conf = getConfig(); - if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { - //Log aggregation is not enabled so don't bother - return; - } - long retentionSecs = conf.getLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS); - if(retentionSecs < 0) { - LOG.info("Log Aggregation deletion is disabled because retention is" + - " too small (" + retentionSecs + ")"); - return; - } - long checkIntervalMsecs = 1000 * conf.getLong( - YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS); - if (checkIntervalMsecs <= 0) { - // when unspecified compute check interval as 1/10th of retention - checkIntervalMsecs = (retentionSecs * 1000) / 10; - } - TimerTask task = new LogDeletionTask(conf, retentionSecs); - timer = new Timer(); - timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs); + scheduleLogDeletionTask(); super.serviceStart(); } @Override protected void serviceStop() throws Exception { - if(timer != null) { - timer.cancel(); - } + stopTimer(); super.serviceStop(); } + + private void setLogAggCheckIntervalMsecs(long retentionSecs) { + Configuration conf = getConfig(); + checkIntervalMsecs = 1000 * conf + .getLong( + YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS); + if (checkIntervalMsecs <= 0) { + // when unspecified compute check interval as 1/10th of retention + checkIntervalMsecs = (retentionSecs * 1000) / 10; + } + } + + public void refreshLogRetentionSettings() { + if (getServiceState() == STATE.STARTED) { + Configuration conf = createConf(); + setConfig(conf); + stopTimer(); + scheduleLogDeletionTask(); + } else { + LOG.warn("Failed to execute refreshLogRetentionSettings : Aggregated Log Deletion Service is not started"); + } + } + + private void scheduleLogDeletionTask() { + Configuration conf = getConfig(); + if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { + // Log aggregation is not enabled so don't bother + return; + } + long retentionSecs = conf.getLong( + YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS); + if (retentionSecs < 0) { + LOG.info("Log Aggregation deletion is disabled because retention is" + + " too small (" + retentionSecs + ")"); + return; + } + setLogAggCheckIntervalMsecs(retentionSecs); + TimerTask task = new LogDeletionTask(conf, retentionSecs); + timer = new Timer(); + timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs); + } + + private void stopTimer() { + if (timer != null) { + timer.cancel(); + } + } + + public long getCheckIntervalMsecs() { + return checkIntervalMsecs; + } + + protected Configuration createConf() { + return new Configuration(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java index 035cd9515c4..05c7e7103c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java @@ -30,6 +30,7 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Before; import org.junit.Test; +import org.junit.Assert; import static org.mockito.Mockito.*; @@ -128,6 +129,99 @@ public class TestAggregatedLogDeletionService { verify(mockFs).delete(app4Dir, true); } + @Test + public void testRefreshLogRetentionSettings() throws IOException { + long now = System.currentTimeMillis(); + //time before 2000 sec + long before2000Secs = now - (2000 * 1000); + //time before 50 sec + long before50Secs = now - (50 * 1000); + String root = "mockfs://foo/"; + String remoteRootLogDir = root + "tmp/logs"; + String suffix = "logs"; + final Configuration conf = new Configuration(); + conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); + conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); + conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800"); + conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, + "1"); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); + + Path rootPath = new Path(root); + FileSystem rootFs = rootPath.getFileSystem(conf); + FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem(); + + Path remoteRootLogPath = new Path(remoteRootLogDir); + + Path userDir = new Path(remoteRootLogPath, "me"); + FileStatus userDirStatus = new FileStatus(0, true, 0, 0, before50Secs, + userDir); + + when(mockFs.listStatus(remoteRootLogPath)).thenReturn( + new FileStatus[] { userDirStatus }); + + Path userLogDir = new Path(userDir, suffix); + + //Set time last modified of app1Dir directory and its files to before2000Secs + Path app1Dir = new Path(userLogDir, "application_1_1"); + FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs, + app1Dir); + + //Set time last modified of app1Dir directory and its files to before50Secs + Path app2Dir = new Path(userLogDir, "application_1_2"); + FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs, + app2Dir); + + when(mockFs.listStatus(userLogDir)).thenReturn( + new FileStatus[] { app1DirStatus, app2DirStatus }); + + Path app1Log1 = new Path(app1Dir, "host1"); + FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs, + app1Log1); + + when(mockFs.listStatus(app1Dir)).thenReturn( + new FileStatus[] { app1Log1Status }); + + Path app2Log1 = new Path(app2Dir, "host1"); + FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, before50Secs, + app2Log1); + + when(mockFs.listStatus(app2Dir)).thenReturn( + new FileStatus[] { app2Log1Status }); + + AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() { + @Override + protected Configuration createConf() { + return conf; + } + }; + + deletionSvc.init(conf); + deletionSvc.start(); + + //app1Dir would be deleted since its done above log retention period + verify(mockFs, timeout(10000)).delete(app1Dir, true); + //app2Dir is not expected to be deleted since its below the threshold + verify(mockFs, timeout(3000).times(0)).delete(app2Dir, true); + + //Now,lets change the confs + conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "50"); + conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, + "2"); + //We have not called refreshLogSettings,hence don't expect to see the changed conf values + Assert.assertTrue(2000l != deletionSvc.getCheckIntervalMsecs()); + + //refresh the log settings + deletionSvc.refreshLogRetentionSettings(); + + //Check interval time should reflect the new value + Assert.assertTrue(2000l == deletionSvc.getCheckIntervalMsecs()); + //app2Dir should be deleted since it falls above the threshold + verify(mockFs, timeout(10000)).delete(app2Dir, true); + deletionSvc.stop(); + } + @Test public void testCheckInterval() throws Exception { long RETENTION_SECS = 10 * 24 * 3600; @@ -176,7 +270,7 @@ public class TestAggregatedLogDeletionService { new AggregatedLogDeletionService(); deletionSvc.init(conf); deletionSvc.start(); - + verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class)); verify(mockFs, never()).delete(app1Dir, true);