svn merge -c 1381317 FIXES: YARN-68. NodeManager will refuse to shutdown indefinitely due to container log aggregation (daryn via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1381318 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-09-05 19:41:21 +00:00
parent 515dfba075
commit 36a45e3995
5 changed files with 56 additions and 34 deletions

View File

@ -111,3 +111,6 @@ Release 0.23.3 - Unreleased
thus causes all containers to be rejected. (vinodkv)
YARN-66. aggregated logs permissions not set properly (tgraves via bobby)
YARN-68. NodeManager will refuse to shutdown indefinitely due to container
log aggregation (daryn via bobby)

View File

@ -26,7 +26,4 @@ public interface AppLogAggregator extends Runnable {
boolean wasContainerSuccessful);
void finishLogAggregation();
void join();
}

View File

@ -137,6 +137,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
try {
doAppLogAggregation();
} finally {
if (!this.appAggregationFinished.get()) {
LOG.warn("Aggregation did not complete for application " + appId);
}
this.appAggregationFinished.set(true);
}
}
@ -155,6 +158,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
}
} catch (InterruptedException e) {
LOG.warn("PendingContainers queue is interrupted");
this.appFinishing.set(true);
}
}
@ -197,6 +201,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
this.appAggregationFinished.set(true);
}
private Path getRemoteNodeTmpLogFileForApp() {
@ -250,21 +255,4 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
LOG.info("Application just finished : " + this.applicationId);
this.appFinishing.set(true);
}
@Override
public void join() {
// Aggregation service is finishing
this.finishLogAggregation();
while (!this.appAggregationFinished.get()) {
LOG.info("Waiting for aggregation to complete for "
+ this.applicationId);
try {
Thread.sleep(THREAD_SLEEP_TIME);
} catch (InterruptedException e) {
LOG.warn("Join interrupted. Some logs may not have been aggregated!!");
break;
}
}
}
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -35,8 +36,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@ -137,12 +136,34 @@ public class LogAggregationService extends AbstractService implements
@Override
public synchronized void stop() {
LOG.info(this.getName() + " waiting for pending aggregation during exit");
for (AppLogAggregator appLogAggregator : this.appLogAggregators.values()) {
appLogAggregator.join();
}
stopAggregators();
super.stop();
}
private void stopAggregators() {
threadPool.shutdown();
// politely ask to finish
for (AppLogAggregator aggregator : appLogAggregators.values()) {
aggregator.finishLogAggregation();
}
while (!threadPool.isTerminated()) { // wait for all threads to finish
for (ApplicationId appId : appLogAggregators.keySet()) {
LOG.info("Waiting for aggregation to complete for " + appId);
}
try {
if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) {
threadPool.shutdownNow(); // send interrupt to hurry them along
}
} catch (InterruptedException e) {
LOG.warn("Aggregation stop interrupted!");
break;
}
}
for (ApplicationId appId : appLogAggregators.keySet()) {
LOG.warn("Some logs may not have been aggregated for " + appId);
}
}
private void verifyAndCreateRemoteLogDir(Configuration conf) {
// Checking the existance of the TLD
FileSystem remoteFS = null;
@ -293,10 +314,7 @@ public class LogAggregationService extends AbstractService implements
final UserGroupInformation userUgi =
UserGroupInformation.createRemoteUser(user);
if (credentials != null) {
for (Token<? extends TokenIdentifier> token : credentials
.getAllTokens()) {
userUgi.addToken(token);
}
userUgi.addCredentials(credentials);
}
// New application
@ -312,9 +330,13 @@ public class LogAggregationService extends AbstractService implements
try {
// Create the app dir
createAppDir(user, appId, userUgi);
} catch (YarnException e) {
} catch (Exception e) {
appLogAggregators.remove(appId);
closeFileSystems(userUgi);
throw e;
if (!(e instanceof YarnException)) {
e = new YarnException(e);
}
throw (YarnException)e;
}

View File

@ -157,14 +157,18 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
application1));
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
// ensure filesystems were closed
verify(logAggregationService).closeFileSystems(
any(UserGroupInformation.class));
delSrvc.stop();
String containerIdStr = ConverterUtils.toString(container11);
File containerLogDir = new File(app1LogDir, containerIdStr);
for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {
Assert.assertFalse(new File(containerLogDir, fileType).exists());
File f = new File(containerLogDir, fileType);
Assert.assertFalse("check "+f, f.exists());
}
Assert.assertFalse(app1LogDir.exists());
@ -222,6 +226,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
application1));
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
Assert.assertFalse(new File(logAggregationService
.getRemoteNodeLogFileForApp(application1, this.user).toUri().getPath())
@ -356,6 +361,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
application1));
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
verifyContainerLogs(logAggregationService, application1,
new ContainerId[] { container11, container12 });
@ -454,7 +460,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
ApplicationId appId = BuilderUtils.newApplicationId(
System.currentTimeMillis(), (int)Math.random());
doThrow(new YarnException("KABOOM!"))
Exception e = new RuntimeException("KABOOM!");
doThrow(e)
.when(logAggregationService).createAppDir(any(String.class),
any(ApplicationId.class), any(UserGroupInformation.class));
logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
@ -463,7 +470,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
dispatcher.await();
ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
new ApplicationFinishEvent(appId, "Application failed to init aggregation: KABOOM!")
new ApplicationFinishEvent(appId,
"Application failed to init aggregation: "+e)
};
checkEvents(appEventHandler, expectedEvents, false,
"getType", "getApplicationID", "getDiagnostic");
@ -479,6 +487,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
logAggregationService.handle(new LogHandlerAppFinishedEvent(
BuilderUtils.newApplicationId(1, 5)));
dispatcher.await();
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
}
private void writeContainerLogs(File appLogDir, ContainerId containerId)
@ -690,6 +701,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
}
@Test