From 4b540bbfcf02d828052999215c6135603d98f5db Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Tue, 31 Jul 2018 12:07:51 -0700 Subject: [PATCH] YARN-8418. App local logs could leaked if log aggregation fails to initialize for the app. (Bibin A Chundatt via wangda) Change-Id: I29a23ca4b219b48c92e7975cd44cddb8b0e04104 --- .../LogAggregationFileController.java | 7 ++ .../nodemanager/NodeStatusUpdaterImpl.java | 1 + .../containermanager/ContainerManager.java | 1 + .../ContainerManagerImpl.java | 13 ++- .../logaggregation/AppLogAggregator.java | 8 ++ .../logaggregation/AppLogAggregatorImpl.java | 15 ++++ .../logaggregation/LogAggregationService.java | 83 +++++++++++++++---- .../loghandler/LogHandler.java | 7 ++ .../loghandler/NonAggregatingLogHandler.java | 9 ++ .../loghandler/event/LogHandlerEventType.java | 4 +- .../event/LogHandlerTokenUpdatedEvent.java | 26 ++++++ .../nodemanager/DummyContainerManager.java | 7 ++ .../TestLogAggregationService.java | 34 +++++--- 13 files changed, 187 insertions(+), 28 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index b047b1c71c2..6b3c9a4a079 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -43,11 +43,14 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.webapp.View.ViewContext; @@ -365,6 +368,10 @@ public abstract class LogAggregationFileController { } }); } catch (Exception e) { + if (e instanceof RemoteException) { + throw new YarnRuntimeException(((RemoteException) e) + .unwrapRemoteException(SecretManager.InvalidToken.class)); + } throw new YarnRuntimeException(e); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 8154723f08f..faf7adb0f16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -1135,6 +1135,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements if (systemCredentials != null && !systemCredentials.isEmpty()) { ((NMContext) context).setSystemCrendentialsForApps( parseCredentials(systemCredentials)); + context.getContainerManager().handleCredentialUpdate(); } List containersToUpdate = response.getContainersToUpdate(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java index 2aeb2458863..356c2e094e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java @@ -44,4 +44,5 @@ public interface ContainerManager extends ServiceStateChangeListener, ContainerScheduler getContainerScheduler(); + void handleCredentialUpdate(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index ce240bc12a2..8b3525820cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -170,7 +171,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -214,6 +214,7 @@ public class ContainerManagerImpl extends CompositeService implements protected final AsyncDispatcher dispatcher; private final DeletionService deletionService; + private LogHandler logHandler; private boolean serviceStopped = false; private final ReadLock readLock; private final WriteLock writeLock; @@ -292,7 +293,7 @@ public class ContainerManagerImpl extends CompositeService implements @Override public void serviceInit(Configuration conf) throws Exception { - LogHandler logHandler = + logHandler = createLogHandler(conf, this.context, this.deletionService); addIfService(logHandler); dispatcher.register(LogHandlerEventType.class, logHandler); @@ -1904,4 +1905,12 @@ public class ContainerManagerImpl extends CompositeService implements public ContainerScheduler getContainerScheduler() { return this.containerScheduler; } + + @Override + public void handleCredentialUpdate() { + Set invalidApps = logHandler.getInvalidTokenApps(); + if (!invalidApps.isEmpty()) { + dispatcher.getEventHandler().handle(new LogHandlerTokenUpdatedEvent()); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java index 01786995736..93436fa96da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.server.api.ContainerLogContext; public interface AppLogAggregator extends Runnable { @@ -29,4 +31,10 @@ public interface AppLogAggregator extends Runnable { void finishLogAggregation(); void disableLogAggregation(); + + void enableLogAggregation(); + + boolean isAggregationEnabled(); + + UserGroupInformation updateCredentials(Credentials cred); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 6630ba61fc9..04503ef9ecf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -561,6 +561,16 @@ public class AppLogAggregatorImpl implements AppLogAggregator { this.logAggregationDisabled = true; } + @Override + public void enableLogAggregation() { + this.logAggregationDisabled = false; + } + + @Override + public boolean isAggregationEnabled() { + return !logAggregationDisabled; + } + @Private @VisibleForTesting // This is only used for testing. @@ -643,6 +653,11 @@ public class AppLogAggregatorImpl implements AppLogAggregator { return this.userUgi; } + public UserGroupInformation updateCredentials(Credentials cred) { + this.userUgi.addCredentials(cred); + return userUgi; + } + @Private @VisibleForTesting public int getLogAggregationTimes() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index dcc165fd1e2..d8db96780ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -20,10 +20,14 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio import java.io.IOException; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.security.token.SecretManager; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +62,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; + import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -83,6 +88,9 @@ public class LogAggregationService extends AbstractService implements private final ConcurrentMap appLogAggregators; + // Holds applications whose aggregation is disable due to invalid Token + private final Set invalidTokenApps; + @VisibleForTesting ExecutorService threadPool; @@ -95,6 +103,7 @@ public class LogAggregationService extends AbstractService implements this.dirsHandler = dirsHandler; this.appLogAggregators = new ConcurrentHashMap(); + this.invalidTokenApps = ConcurrentHashMap.newKeySet(); } protected void serviceInit(Configuration conf) throws Exception { @@ -224,8 +233,8 @@ public class LogAggregationService extends AbstractService implements userUgi.addCredentials(credentials); } - LogAggregationFileController logAggregationFileController - = getLogAggregationFileController(getConfig()); + LogAggregationFileController logAggregationFileController = + getLogAggregationFileController(getConfig()); logAggregationFileController.verifyAndCreateRemoteLogDir(); // New application final AppLogAggregator appLogAggregator = @@ -245,14 +254,16 @@ public class LogAggregationService extends AbstractService implements logAggregationFileController.createAppDir(user, appId, userUgi); } catch (Exception e) { appLogAggregator.disableLogAggregation(); + + // add to disabled aggregators if due to InvalidToken + if (e.getCause() instanceof SecretManager.InvalidToken) { + invalidTokenApps.add(appId); + } if (!(e instanceof YarnRuntimeException)) { appDirException = new YarnRuntimeException(e); } else { appDirException = (YarnRuntimeException)e; } - appLogAggregators.remove(appId); - closeFileSystems(userUgi); - throw appDirException; } // TODO Get the user configuration for the list of containers that need log @@ -270,6 +281,10 @@ public class LogAggregationService extends AbstractService implements } }; this.threadPool.execute(aggregatorWrapper); + + if (appDirException != null) { + throw appDirException; + } } protected void closeFileSystems(final UserGroupInformation userUgi) { @@ -307,17 +322,20 @@ public class LogAggregationService extends AbstractService implements // App is complete. Finish up any containers' pending log aggregation and // close the application specific logFile. - - AppLogAggregator aggregator = this.appLogAggregators.get(appId); - if (aggregator == null) { - LOG.warn("Log aggregation is not initialized for " + appId - + ", did it fail to start?"); - this.dispatcher.getEventHandler().handle( - new ApplicationEvent(appId, - ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)); - return; + try { + AppLogAggregator aggregator = this.appLogAggregators.get(appId); + if (aggregator == null) { + LOG.warn("Log aggregation is not initialized for " + appId + + ", did it fail to start?"); + this.dispatcher.getEventHandler().handle(new ApplicationEvent(appId, + ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)); + return; + } + aggregator.finishLogAggregation(); + } finally { + // Remove invalid Token Apps + invalidTokenApps.remove(appId); } - aggregator.finishLogAggregation(); } @Override @@ -344,12 +362,47 @@ public class LogAggregationService extends AbstractService implements (LogHandlerAppFinishedEvent) event; stopApp(appFinishedEvent.getApplicationId()); break; + case LOG_AGG_TOKEN_UPDATE: + checkAndEnableAppAggregators(); + break; default: ; // Ignore } } + private void checkAndEnableAppAggregators() { + for (ApplicationId appId : invalidTokenApps) { + try { + AppLogAggregator aggregator = appLogAggregators.get(appId); + if (aggregator != null) { + Credentials credentials = + context.getSystemCredentialsForApps().get(appId); + if (credentials != null) { + // Create the app dir again with + LogAggregationFileController logAggregationFileController = + getLogAggregationFileController(getConfig()); + UserGroupInformation userUgi = + aggregator.updateCredentials(credentials); + logAggregationFileController + .createAppDir(userUgi.getShortUserName(), appId, userUgi); + aggregator.enableLogAggregation(); + } + invalidTokenApps.remove(appId); + LOG.info("LogAggregation enabled for application {}", appId); + } + } catch (Exception e) { + //Ignore exception + LOG.warn("Enable aggregators failed {}", appId); + } + } + } + + @Override + public Set getInvalidTokenApps() { + return invalidTokenApps; + } + @VisibleForTesting public ConcurrentMap getAppLogAggregators() { return this.appLogAggregators; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java index 6eb3fb45abc..459fdf4600c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java @@ -18,9 +18,16 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; + + +import java.util.Set; + public interface LogHandler extends EventHandler { public void handle(LogHandlerEvent event); + + public Set getInvalidTokenApps(); } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java index 9c43ddef92a..d66aa127a0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java @@ -19,8 +19,12 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; @@ -204,6 +208,11 @@ public class NonAggregatingLogHandler extends AbstractService implements } } + @Override + public Set getInvalidTokenApps() { + return Collections.emptySet(); + } + ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor( Configuration conf) { ThreadFactory tf = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java index 684d6b2605f..ec477c2b00f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java @@ -19,5 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event; public enum LogHandlerEventType { - APPLICATION_STARTED, CONTAINER_FINISHED, APPLICATION_FINISHED + APPLICATION_STARTED, + CONTAINER_FINISHED, + APPLICATION_FINISHED, LOG_AGG_TOKEN_UPDATE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java new file mode 100644 index 00000000000..772a46312ca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event; + +public class LogHandlerTokenUpdatedEvent extends LogHandlerEvent { + + public LogHandlerTokenUpdatedEvent() { + super(LogHandlerEventType.LOG_AGG_TOKEN_UPDATE); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java index b5cb43b9716..feabeb12fb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java @@ -24,6 +24,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; +import java.util.Collections; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -187,6 +189,11 @@ public class DummyContainerManager extends ContainerManagerImpl { // Ignore } } + + @Override + public Set getInvalidTokenApps() { + return Collections.emptySet(); + } }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 6268ad986c3..8b2e3cc98cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -73,6 +73,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -128,6 +129,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.Tes import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -823,7 +825,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest { .getFileControllerForWrite(); LogAggregationFileController spyLogAggregationFileFormat = spy(logAggregationFileFormat); - Exception e = new RuntimeException("KABOOM!"); + Exception e = + new YarnRuntimeException(new SecretManager.InvalidToken("KABOOM!")); doThrow(e).when(spyLogAggregationFileFormat) .createAppDir(any(String.class), any(ApplicationId.class), any(UserGroupInformation.class)); @@ -862,29 +865,40 @@ public class TestLogAggregationService extends BaseContainerManagerTest { }; checkEvents(appEventHandler, expectedEvents, false, "getType", "getApplicationID", "getDiagnostic"); - + Assert.assertEquals(logAggregationService.getInvalidTokenApps().size(), 1); // verify trying to collect logs for containers/apps we don't know about // doesn't blow up and tear down the NM logAggregationService.handle(new LogHandlerContainerFinishedEvent( BuilderUtils.newContainerId(4, 1, 1, 1), ContainerType.APPLICATION_MASTER, 0)); dispatcher.await(); + + AppLogAggregator appAgg = + logAggregationService.getAppLogAggregators().get(appId); + Assert.assertFalse("Aggregation should be disabled", + appAgg.isAggregationEnabled()); + + // Enabled aggregation + logAggregationService.handle(new LogHandlerTokenUpdatedEvent()); + dispatcher.await(); + + appAgg = + logAggregationService.getAppLogAggregators().get(appId); + Assert.assertFalse("Aggregation should be enabled", + appAgg.isAggregationEnabled()); + + // Check disabled apps are cleared + Assert.assertEquals(0, logAggregationService.getInvalidTokenApps().size()); + logAggregationService.handle(new LogHandlerAppFinishedEvent( BuilderUtils.newApplicationId(1, 5))); dispatcher.await(); logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); - // local log dir shouldn't be deleted given log aggregation cannot - // continue due to aggregated log dir creation failure on remoteFS. - FileDeletionTask deletionTask = new FileDeletionTask(spyDelSrvc, user, - null, null); - verify(spyDelSrvc, never()).delete(deletionTask); + verify(spyDelSrvc).delete(any(FileDeletionTask.class)); verify(logAggregationService).closeFileSystems( any(UserGroupInformation.class)); - // make sure local log dir is not deleted in case log aggregation - // service cannot be initiated. - assertTrue(appLogDir.exists()); } private void writeContainerLogs(File appLogDir, ContainerId containerId,