svn merge -c 1297796 from trunk to branch-0.23 FIXES: MAPREDUCE-3977. LogAggregationService leaks log aggregator objects (Jason Lowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1297797 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-03-06 23:46:02 +00:00
parent de89bbcb88
commit 9b32863cb8
3 changed files with 55 additions and 8 deletions

View File

@ -203,6 +203,9 @@ Release 0.23.2 - UNRELEASED
MAPREDUCE-3961. Map/ReduceSlotMillis computation incorrect (Siddharth Seth MAPREDUCE-3961. Map/ReduceSlotMillis computation incorrect (Siddharth Seth
via bobby) via bobby)
MAPREDUCE-3977. LogAggregationService leaks log aggregator objects
(Jason Lowe via bobby)
Release 0.23.1 - 2012-02-17 Release 0.23.1 - 2012-02-17
NEW FEATURES NEW FEATURES

View File

@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -139,12 +140,6 @@ public class LogAggregationService extends AbstractService implements
super.stop(); super.stop();
} }
private void verifyAndCreateRemoteLogDir(Configuration conf) { private void verifyAndCreateRemoteLogDir(Configuration conf) {
// Checking the existance of the TLD // Checking the existance of the TLD
FileSystem remoteFS = null; FileSystem remoteFS = null;
@ -289,7 +284,7 @@ public class LogAggregationService extends AbstractService implements
createAppDir(user, appId, userUgi); createAppDir(user, appId, userUgi);
// New application // New application
AppLogAggregator appLogAggregator = final AppLogAggregator appLogAggregator =
new AppLogAggregatorImpl(this.dispatcher, this.deletionService, new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
getConfig(), appId, userUgi, dirsHandler, getConfig(), appId, userUgi, dirsHandler,
getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
@ -303,7 +298,22 @@ public class LogAggregationService extends AbstractService implements
// aggregation. // aggregation.
// Schedule the aggregator. // Schedule the aggregator.
this.threadPool.execute(appLogAggregator); Runnable aggregatorWrapper = new Runnable() {
public void run() {
try {
appLogAggregator.run();
} finally {
appLogAggregators.remove(appId);
}
}
};
this.threadPool.execute(aggregatorWrapper);
}
// for testing only
@Private
int getNumAggregators() {
return this.appLogAggregators.size();
} }
private void stopContainer(ContainerId containerId, int exitCode) { private void stopContainer(ContainerId containerId, int exitCode) {

View File

@ -565,4 +565,38 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
logAggregationService.stop(); logAggregationService.stop();
} }
@Test
@SuppressWarnings("unchecked")
public void testLogAggregatorCleanup() throws Exception {
DeletionService delSrvc = mock(DeletionService.class);
// get the AppLogAggregationImpl thread to crash
LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, delSrvc,
mockedDirSvc);
logAggregationService.init(this.conf);
logAggregationService.start();
ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
logAggregationService.handle(new LogHandlerAppStartedEvent(
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
logAggregationService.handle(new LogHandlerAppFinishedEvent(application1));
dispatcher.await();
int timeToWait = 20 * 1000;
while (timeToWait > 0 && logAggregationService.getNumAggregators() > 0) {
Thread.sleep(100);
timeToWait -= 100;
}
Assert.assertEquals("Log aggregator failed to cleanup!", 0,
logAggregationService.getNumAggregators());
}
} }