YARN-8418. App local logs could leaked if log aggregation fails to initialize for the app. (Bibin A Chundatt via wangda)
Change-Id: I29a23ca4b219b48c92e7975cd44cddb8b0e04104
(cherry picked from commit 4b540bbfcf
)
This commit is contained in:
parent
b91cf90e1c
commit
7b552c9d72
|
@ -43,11 +43,14 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||||
import org.apache.hadoop.yarn.webapp.View.ViewContext;
|
import org.apache.hadoop.yarn.webapp.View.ViewContext;
|
||||||
|
@ -363,6 +366,10 @@ public abstract class LogAggregationFileController {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
if (e instanceof RemoteException) {
|
||||||
|
throw new YarnRuntimeException(((RemoteException) e)
|
||||||
|
.unwrapRemoteException(SecretManager.InvalidToken.class));
|
||||||
|
}
|
||||||
throw new YarnRuntimeException(e);
|
throw new YarnRuntimeException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1135,6 +1135,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
if (systemCredentials != null && !systemCredentials.isEmpty()) {
|
if (systemCredentials != null && !systemCredentials.isEmpty()) {
|
||||||
((NMContext) context).setSystemCrendentialsForApps(
|
((NMContext) context).setSystemCrendentialsForApps(
|
||||||
parseCredentials(systemCredentials));
|
parseCredentials(systemCredentials));
|
||||||
|
context.getContainerManager().handleCredentialUpdate();
|
||||||
}
|
}
|
||||||
List<org.apache.hadoop.yarn.api.records.Container>
|
List<org.apache.hadoop.yarn.api.records.Container>
|
||||||
containersToUpdate = response.getContainersToUpdate();
|
containersToUpdate = response.getContainersToUpdate();
|
||||||
|
|
|
@ -44,4 +44,5 @@ public interface ContainerManager extends ServiceStateChangeListener,
|
||||||
|
|
||||||
ContainerScheduler getContainerScheduler();
|
ContainerScheduler getContainerScheduler();
|
||||||
|
|
||||||
|
void handleCredentialUpdate();
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -170,7 +171,6 @@ import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -214,6 +214,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
protected final AsyncDispatcher dispatcher;
|
protected final AsyncDispatcher dispatcher;
|
||||||
|
|
||||||
private final DeletionService deletionService;
|
private final DeletionService deletionService;
|
||||||
|
private LogHandler logHandler;
|
||||||
private boolean serviceStopped = false;
|
private boolean serviceStopped = false;
|
||||||
private final ReadLock readLock;
|
private final ReadLock readLock;
|
||||||
private final WriteLock writeLock;
|
private final WriteLock writeLock;
|
||||||
|
@ -292,7 +293,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
@Override
|
@Override
|
||||||
public void serviceInit(Configuration conf) throws Exception {
|
public void serviceInit(Configuration conf) throws Exception {
|
||||||
|
|
||||||
LogHandler logHandler =
|
logHandler =
|
||||||
createLogHandler(conf, this.context, this.deletionService);
|
createLogHandler(conf, this.context, this.deletionService);
|
||||||
addIfService(logHandler);
|
addIfService(logHandler);
|
||||||
dispatcher.register(LogHandlerEventType.class, logHandler);
|
dispatcher.register(LogHandlerEventType.class, logHandler);
|
||||||
|
@ -1904,4 +1905,12 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
public ContainerScheduler getContainerScheduler() {
|
public ContainerScheduler getContainerScheduler() {
|
||||||
return this.containerScheduler;
|
return this.containerScheduler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleCredentialUpdate() {
|
||||||
|
Set<ApplicationId> invalidApps = logHandler.getInvalidTokenApps();
|
||||||
|
if (!invalidApps.isEmpty()) {
|
||||||
|
dispatcher.getEventHandler().handle(new LogHandlerTokenUpdatedEvent());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
|
||||||
|
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
|
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
|
||||||
|
|
||||||
public interface AppLogAggregator extends Runnable {
|
public interface AppLogAggregator extends Runnable {
|
||||||
|
@ -29,4 +31,10 @@ public interface AppLogAggregator extends Runnable {
|
||||||
void finishLogAggregation();
|
void finishLogAggregation();
|
||||||
|
|
||||||
void disableLogAggregation();
|
void disableLogAggregation();
|
||||||
|
|
||||||
|
void enableLogAggregation();
|
||||||
|
|
||||||
|
boolean isAggregationEnabled();
|
||||||
|
|
||||||
|
UserGroupInformation updateCredentials(Credentials cred);
|
||||||
}
|
}
|
||||||
|
|
|
@ -534,6 +534,16 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
this.logAggregationDisabled = true;
|
this.logAggregationDisabled = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void enableLogAggregation() {
|
||||||
|
this.logAggregationDisabled = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAggregationEnabled() {
|
||||||
|
return !logAggregationDisabled;
|
||||||
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
// This is only used for testing.
|
// This is only used for testing.
|
||||||
|
@ -616,6 +626,11 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
return this.userUgi;
|
return this.userUgi;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public UserGroupInformation updateCredentials(Credentials cred) {
|
||||||
|
this.userUgi.addCredentials(cred);
|
||||||
|
return userUgi;
|
||||||
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public int getLogAggregationTimes() {
|
public int getLogAggregationTimes() {
|
||||||
|
|
|
@ -20,10 +20,14 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -58,6 +62,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
||||||
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
|
@ -83,6 +88,9 @@ public class LogAggregationService extends AbstractService implements
|
||||||
|
|
||||||
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
|
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
|
||||||
|
|
||||||
|
// Holds applications whose aggregation is disable due to invalid Token
|
||||||
|
private final Set<ApplicationId> invalidTokenApps;
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
ExecutorService threadPool;
|
ExecutorService threadPool;
|
||||||
|
|
||||||
|
@ -95,6 +103,7 @@ public class LogAggregationService extends AbstractService implements
|
||||||
this.dirsHandler = dirsHandler;
|
this.dirsHandler = dirsHandler;
|
||||||
this.appLogAggregators =
|
this.appLogAggregators =
|
||||||
new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
|
new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
|
||||||
|
this.invalidTokenApps = ConcurrentHashMap.newKeySet();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
|
@ -224,8 +233,8 @@ public class LogAggregationService extends AbstractService implements
|
||||||
userUgi.addCredentials(credentials);
|
userUgi.addCredentials(credentials);
|
||||||
}
|
}
|
||||||
|
|
||||||
LogAggregationFileController logAggregationFileController
|
LogAggregationFileController logAggregationFileController =
|
||||||
= getLogAggregationFileController(getConfig());
|
getLogAggregationFileController(getConfig());
|
||||||
logAggregationFileController.verifyAndCreateRemoteLogDir();
|
logAggregationFileController.verifyAndCreateRemoteLogDir();
|
||||||
// New application
|
// New application
|
||||||
final AppLogAggregator appLogAggregator =
|
final AppLogAggregator appLogAggregator =
|
||||||
|
@ -245,14 +254,16 @@ public class LogAggregationService extends AbstractService implements
|
||||||
logAggregationFileController.createAppDir(user, appId, userUgi);
|
logAggregationFileController.createAppDir(user, appId, userUgi);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
appLogAggregator.disableLogAggregation();
|
appLogAggregator.disableLogAggregation();
|
||||||
|
|
||||||
|
// add to disabled aggregators if due to InvalidToken
|
||||||
|
if (e.getCause() instanceof SecretManager.InvalidToken) {
|
||||||
|
invalidTokenApps.add(appId);
|
||||||
|
}
|
||||||
if (!(e instanceof YarnRuntimeException)) {
|
if (!(e instanceof YarnRuntimeException)) {
|
||||||
appDirException = new YarnRuntimeException(e);
|
appDirException = new YarnRuntimeException(e);
|
||||||
} else {
|
} else {
|
||||||
appDirException = (YarnRuntimeException)e;
|
appDirException = (YarnRuntimeException)e;
|
||||||
}
|
}
|
||||||
appLogAggregators.remove(appId);
|
|
||||||
closeFileSystems(userUgi);
|
|
||||||
throw appDirException;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO Get the user configuration for the list of containers that need log
|
// TODO Get the user configuration for the list of containers that need log
|
||||||
|
@ -270,6 +281,10 @@ public class LogAggregationService extends AbstractService implements
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
this.threadPool.execute(aggregatorWrapper);
|
this.threadPool.execute(aggregatorWrapper);
|
||||||
|
|
||||||
|
if (appDirException != null) {
|
||||||
|
throw appDirException;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void closeFileSystems(final UserGroupInformation userUgi) {
|
protected void closeFileSystems(final UserGroupInformation userUgi) {
|
||||||
|
@ -307,17 +322,20 @@ public class LogAggregationService extends AbstractService implements
|
||||||
|
|
||||||
// App is complete. Finish up any containers' pending log aggregation and
|
// App is complete. Finish up any containers' pending log aggregation and
|
||||||
// close the application specific logFile.
|
// close the application specific logFile.
|
||||||
|
try {
|
||||||
AppLogAggregator aggregator = this.appLogAggregators.get(appId);
|
AppLogAggregator aggregator = this.appLogAggregators.get(appId);
|
||||||
if (aggregator == null) {
|
if (aggregator == null) {
|
||||||
LOG.warn("Log aggregation is not initialized for " + appId
|
LOG.warn("Log aggregation is not initialized for " + appId
|
||||||
+ ", did it fail to start?");
|
+ ", did it fail to start?");
|
||||||
this.dispatcher.getEventHandler().handle(
|
this.dispatcher.getEventHandler().handle(new ApplicationEvent(appId,
|
||||||
new ApplicationEvent(appId,
|
|
||||||
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
|
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
aggregator.finishLogAggregation();
|
aggregator.finishLogAggregation();
|
||||||
|
} finally {
|
||||||
|
// Remove invalid Token Apps
|
||||||
|
invalidTokenApps.remove(appId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -344,12 +362,47 @@ public class LogAggregationService extends AbstractService implements
|
||||||
(LogHandlerAppFinishedEvent) event;
|
(LogHandlerAppFinishedEvent) event;
|
||||||
stopApp(appFinishedEvent.getApplicationId());
|
stopApp(appFinishedEvent.getApplicationId());
|
||||||
break;
|
break;
|
||||||
|
case LOG_AGG_TOKEN_UPDATE:
|
||||||
|
checkAndEnableAppAggregators();
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
; // Ignore
|
; // Ignore
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkAndEnableAppAggregators() {
|
||||||
|
for (ApplicationId appId : invalidTokenApps) {
|
||||||
|
try {
|
||||||
|
AppLogAggregator aggregator = appLogAggregators.get(appId);
|
||||||
|
if (aggregator != null) {
|
||||||
|
Credentials credentials =
|
||||||
|
context.getSystemCredentialsForApps().get(appId);
|
||||||
|
if (credentials != null) {
|
||||||
|
// Create the app dir again with
|
||||||
|
LogAggregationFileController logAggregationFileController =
|
||||||
|
getLogAggregationFileController(getConfig());
|
||||||
|
UserGroupInformation userUgi =
|
||||||
|
aggregator.updateCredentials(credentials);
|
||||||
|
logAggregationFileController
|
||||||
|
.createAppDir(userUgi.getShortUserName(), appId, userUgi);
|
||||||
|
aggregator.enableLogAggregation();
|
||||||
|
}
|
||||||
|
invalidTokenApps.remove(appId);
|
||||||
|
LOG.info("LogAggregation enabled for application {}", appId);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
//Ignore exception
|
||||||
|
LOG.warn("Enable aggregators failed {}", appId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<ApplicationId> getInvalidTokenApps() {
|
||||||
|
return invalidTokenApps;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public ConcurrentMap<ApplicationId, AppLogAggregator> getAppLogAggregators() {
|
public ConcurrentMap<ApplicationId, AppLogAggregator> getAppLogAggregators() {
|
||||||
return this.appLogAggregators;
|
return this.appLogAggregators;
|
||||||
|
|
|
@ -18,9 +18,16 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
public interface LogHandler extends EventHandler<LogHandlerEvent> {
|
public interface LogHandler extends EventHandler<LogHandlerEvent> {
|
||||||
public void handle(LogHandlerEvent event);
|
public void handle(LogHandlerEvent event);
|
||||||
|
|
||||||
|
public Set<ApplicationId> getInvalidTokenApps();
|
||||||
}
|
}
|
|
@ -19,8 +19,12 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
@ -204,6 +208,11 @@ public class NonAggregatingLogHandler extends AbstractService implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<ApplicationId> getInvalidTokenApps() {
|
||||||
|
return Collections.emptySet();
|
||||||
|
}
|
||||||
|
|
||||||
ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor(
|
ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor(
|
||||||
Configuration conf) {
|
Configuration conf) {
|
||||||
ThreadFactory tf =
|
ThreadFactory tf =
|
||||||
|
|
|
@ -19,5 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
|
||||||
|
|
||||||
public enum LogHandlerEventType {
|
public enum LogHandlerEventType {
|
||||||
APPLICATION_STARTED, CONTAINER_FINISHED, APPLICATION_FINISHED
|
APPLICATION_STARTED,
|
||||||
|
CONTAINER_FINISHED,
|
||||||
|
APPLICATION_FINISHED, LOG_AGG_TOKEN_UPDATE
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,26 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
|
||||||
|
|
||||||
|
public class LogHandlerTokenUpdatedEvent extends LogHandlerEvent {
|
||||||
|
|
||||||
|
public LogHandlerTokenUpdatedEvent() {
|
||||||
|
super(LogHandlerEventType.LOG_AGG_TOKEN_UPDATE);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -24,6 +24,8 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -187,6 +189,11 @@ public class DummyContainerManager extends ContainerManagerImpl {
|
||||||
// Ignore
|
// Ignore
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<ApplicationId> getInvalidTokenApps() {
|
||||||
|
return Collections.emptySet();
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -73,6 +73,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||||
|
@ -128,6 +129,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.Tes
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
@ -823,7 +825,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
.getFileControllerForWrite();
|
.getFileControllerForWrite();
|
||||||
LogAggregationFileController spyLogAggregationFileFormat =
|
LogAggregationFileController spyLogAggregationFileFormat =
|
||||||
spy(logAggregationFileFormat);
|
spy(logAggregationFileFormat);
|
||||||
Exception e = new RuntimeException("KABOOM!");
|
Exception e =
|
||||||
|
new YarnRuntimeException(new SecretManager.InvalidToken("KABOOM!"));
|
||||||
doThrow(e).when(spyLogAggregationFileFormat)
|
doThrow(e).when(spyLogAggregationFileFormat)
|
||||||
.createAppDir(any(String.class), any(ApplicationId.class),
|
.createAppDir(any(String.class), any(ApplicationId.class),
|
||||||
any(UserGroupInformation.class));
|
any(UserGroupInformation.class));
|
||||||
|
@ -862,29 +865,40 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
};
|
};
|
||||||
checkEvents(appEventHandler, expectedEvents, false,
|
checkEvents(appEventHandler, expectedEvents, false,
|
||||||
"getType", "getApplicationID", "getDiagnostic");
|
"getType", "getApplicationID", "getDiagnostic");
|
||||||
|
Assert.assertEquals(logAggregationService.getInvalidTokenApps().size(), 1);
|
||||||
// verify trying to collect logs for containers/apps we don't know about
|
// verify trying to collect logs for containers/apps we don't know about
|
||||||
// doesn't blow up and tear down the NM
|
// doesn't blow up and tear down the NM
|
||||||
logAggregationService.handle(new LogHandlerContainerFinishedEvent(
|
logAggregationService.handle(new LogHandlerContainerFinishedEvent(
|
||||||
BuilderUtils.newContainerId(4, 1, 1, 1),
|
BuilderUtils.newContainerId(4, 1, 1, 1),
|
||||||
ContainerType.APPLICATION_MASTER, 0));
|
ContainerType.APPLICATION_MASTER, 0));
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
|
||||||
|
AppLogAggregator appAgg =
|
||||||
|
logAggregationService.getAppLogAggregators().get(appId);
|
||||||
|
Assert.assertFalse("Aggregation should be disabled",
|
||||||
|
appAgg.isAggregationEnabled());
|
||||||
|
|
||||||
|
// Enabled aggregation
|
||||||
|
logAggregationService.handle(new LogHandlerTokenUpdatedEvent());
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
appAgg =
|
||||||
|
logAggregationService.getAppLogAggregators().get(appId);
|
||||||
|
Assert.assertFalse("Aggregation should be enabled",
|
||||||
|
appAgg.isAggregationEnabled());
|
||||||
|
|
||||||
|
// Check disabled apps are cleared
|
||||||
|
Assert.assertEquals(0, logAggregationService.getInvalidTokenApps().size());
|
||||||
|
|
||||||
logAggregationService.handle(new LogHandlerAppFinishedEvent(
|
logAggregationService.handle(new LogHandlerAppFinishedEvent(
|
||||||
BuilderUtils.newApplicationId(1, 5)));
|
BuilderUtils.newApplicationId(1, 5)));
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
|
||||||
logAggregationService.stop();
|
logAggregationService.stop();
|
||||||
assertEquals(0, logAggregationService.getNumAggregators());
|
assertEquals(0, logAggregationService.getNumAggregators());
|
||||||
// local log dir shouldn't be deleted given log aggregation cannot
|
verify(spyDelSrvc).delete(any(FileDeletionTask.class));
|
||||||
// continue due to aggregated log dir creation failure on remoteFS.
|
|
||||||
FileDeletionTask deletionTask = new FileDeletionTask(spyDelSrvc, user,
|
|
||||||
null, null);
|
|
||||||
verify(spyDelSrvc, never()).delete(deletionTask);
|
|
||||||
verify(logAggregationService).closeFileSystems(
|
verify(logAggregationService).closeFileSystems(
|
||||||
any(UserGroupInformation.class));
|
any(UserGroupInformation.class));
|
||||||
// make sure local log dir is not deleted in case log aggregation
|
|
||||||
// service cannot be initiated.
|
|
||||||
assertTrue(appLogDir.exists());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeContainerLogs(File appLogDir, ContainerId containerId,
|
private void writeContainerLogs(File appLogDir, ContainerId containerId,
|
||||||
|
|
Loading…
Reference in New Issue