diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 11a6c6a0fb1..c238462037c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -295,6 +295,9 @@ Release 2.4.0 - UNRELEASED YARN-1560. Fixed TestYarnClient#testAMMRTokens failure with null AMRM token. (Ted Yu via jianhe) + YARN-1409. NonAggregatingLogHandler can throw RejectedExecutionException + (Tsuyoshi OZAWA via jlowe) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java index 06b654ae064..a9aacb58c2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java @@ -23,6 +23,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.RejectedExecutionException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -118,10 +119,17 @@ public void handle(LogHandlerEvent event) { LOG.info("Scheduling Log Deletion for application: " + appFinishedEvent.getApplicationId() + ", with delay of " + this.deleteDelaySeconds + " seconds"); - sched.schedule( + LogDeleterRunnable logDeleter = new LogDeleterRunnable(appOwners.remove(appFinishedEvent - .getApplicationId()), appFinishedEvent.getApplicationId()), - this.deleteDelaySeconds, TimeUnit.SECONDS); + .getApplicationId()), appFinishedEvent.getApplicationId()); + try { + sched.schedule(logDeleter, this.deleteDelaySeconds, + TimeUnit.SECONDS); + } catch (RejectedExecutionException e) { + // Handling this event in local thread before starting threads + // or after calling sched.shutdownNow(). + logDeleter.run(); + } break; default: ; // Ignore diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java index e31ee007ca4..298157bcd4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; @@ -202,6 +203,32 @@ public void testStop() throws Exception { verify(logHandler.mockSched).shutdownNow(); } + @Test + public void testHandlingApplicationFinishedEvent() { + Configuration conf = new Configuration(); + LocalDirsHandlerService dirsService = new LocalDirsHandlerService(); + DeletionService delService = new DeletionService(null); + NonAggregatingLogHandler aggregatingLogHandler = + new NonAggregatingLogHandler(new InlineDispatcher(), + delService, + dirsService); + + dirsService.init(conf); + dirsService.start(); + delService.init(conf); + delService.start(); + aggregatingLogHandler.init(conf); + aggregatingLogHandler.start(); + ApplicationId appId = BuilderUtils.newApplicationId(1234, 1); + // It should NOT throw RejectedExecutionException + aggregatingLogHandler.handle(new LogHandlerAppFinishedEvent(appId)); + aggregatingLogHandler.stop(); + + // It should NOT throw RejectedExecutionException after stopping + // handler service. + aggregatingLogHandler.handle(new LogHandlerAppFinishedEvent(appId)); + } + private class NonAggregatingLogHandlerWithMockExecutor extends NonAggregatingLogHandler {