YARN-1409. NonAggregatingLogHandler can throw RejectedExecutionException. Contributed by Tsuyoshi OZAWA

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1556282 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2014-01-07 17:11:09 +00:00
parent c3cc855d27
commit 785c12056c
3 changed files with 41 additions and 3 deletions

View File

@ -295,6 +295,9 @@ Release 2.4.0 - UNRELEASED
YARN-1560. Fixed TestYarnClient#testAMMRTokens failure with null AMRM token. YARN-1560. Fixed TestYarnClient#testAMMRTokens failure with null AMRM token.
(Ted Yu via jianhe) (Ted Yu via jianhe)
YARN-1409. NonAggregatingLogHandler can throw RejectedExecutionException
(Tsuyoshi OZAWA via jlowe)
Release 2.3.0 - UNRELEASED Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.RejectedExecutionException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -118,10 +119,17 @@ public class NonAggregatingLogHandler extends AbstractService implements
LOG.info("Scheduling Log Deletion for application: " LOG.info("Scheduling Log Deletion for application: "
+ appFinishedEvent.getApplicationId() + ", with delay of " + appFinishedEvent.getApplicationId() + ", with delay of "
+ this.deleteDelaySeconds + " seconds"); + this.deleteDelaySeconds + " seconds");
sched.schedule( LogDeleterRunnable logDeleter =
new LogDeleterRunnable(appOwners.remove(appFinishedEvent new LogDeleterRunnable(appOwners.remove(appFinishedEvent
.getApplicationId()), appFinishedEvent.getApplicationId()), .getApplicationId()), appFinishedEvent.getApplicationId());
this.deleteDelaySeconds, TimeUnit.SECONDS); try {
sched.schedule(logDeleter, this.deleteDelaySeconds,
TimeUnit.SECONDS);
} catch (RejectedExecutionException e) {
// Handling this event in local thread before starting threads
// or after calling sched.shutdownNow().
logDeleter.run();
}
break; break;
default: default:
; // Ignore ; // Ignore

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@ -202,6 +203,32 @@ public class TestNonAggregatingLogHandler {
verify(logHandler.mockSched).shutdownNow(); verify(logHandler.mockSched).shutdownNow();
} }
@Test
public void testHandlingApplicationFinishedEvent() {
Configuration conf = new Configuration();
LocalDirsHandlerService dirsService = new LocalDirsHandlerService();
DeletionService delService = new DeletionService(null);
NonAggregatingLogHandler aggregatingLogHandler =
new NonAggregatingLogHandler(new InlineDispatcher(),
delService,
dirsService);
dirsService.init(conf);
dirsService.start();
delService.init(conf);
delService.start();
aggregatingLogHandler.init(conf);
aggregatingLogHandler.start();
ApplicationId appId = BuilderUtils.newApplicationId(1234, 1);
// It should NOT throw RejectedExecutionException
aggregatingLogHandler.handle(new LogHandlerAppFinishedEvent(appId));
aggregatingLogHandler.stop();
// It should NOT throw RejectedExecutionException after stopping
// handler service.
aggregatingLogHandler.handle(new LogHandlerAppFinishedEvent(appId));
}
private class NonAggregatingLogHandlerWithMockExecutor extends private class NonAggregatingLogHandlerWithMockExecutor extends
NonAggregatingLogHandler { NonAggregatingLogHandler {