YARN-2468. Enhanced NodeManager to support log handling APIs (YARN-2569) for use by long running services. Contributed by Xuan Gong.

(cherry picked from commit 34cdcaad71)
This commit is contained in:
Vinod Kumar Vavilapalli 2014-10-03 12:15:40 -07:00
parent b6ce0a1f69
commit 7ed61e150c
7 changed files with 777 additions and 149 deletions

View File

@ -109,6 +109,9 @@ Release 2.6.0 - UNRELEASED
YARN-2446. Augmented Timeline service APIs to start taking in domains as a YARN-2446. Augmented Timeline service APIs to start taking in domains as a
parameter while posting entities and events. (Zhijie Shen via vinodkv) parameter while posting entities and events. (Zhijie Shen via vinodkv)
YARN-2468. Enhanced NodeManager to support log handling APIs (YARN-2569) for
use by long running services. (Xuan Gong via vinodkv)
IMPROVEMENTS IMPROVEMENTS
YARN-2242. Improve exception information on AM launch crashes. (Li Lu YARN-2242. Improve exception information on AM launch crashes. (Li Lu

View File

@ -35,9 +35,13 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.commons.io.input.BoundedInputStream; import org.apache.commons.io.input.BoundedInputStream;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -60,10 +64,15 @@ import org.apache.hadoop.io.file.tfile.TFile;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
@Public @Public
@Evolving @Evolving
public class AggregatedLogFormat { public class AggregatedLogFormat {
@ -149,20 +158,33 @@ public class AggregatedLogFormat {
private final List<String> rootLogDirs; private final List<String> rootLogDirs;
private final ContainerId containerId; private final ContainerId containerId;
private final String user; private final String user;
private final LogAggregationContext logAggregationContext;
private Set<File> uploadedFiles = new HashSet<File>();
private final Set<String> alreadyUploadedLogFiles;
private Set<String> allExistingFileMeta = new HashSet<String>();
// TODO Maybe add a version string here. Instead of changing the version of // TODO Maybe add a version string here. Instead of changing the version of
// the entire k-v format // the entire k-v format
public LogValue(List<String> rootLogDirs, ContainerId containerId, public LogValue(List<String> rootLogDirs, ContainerId containerId,
String user) { String user) {
this(rootLogDirs, containerId, user, null, new HashSet<String>());
}
public LogValue(List<String> rootLogDirs, ContainerId containerId,
String user, LogAggregationContext logAggregationContext,
Set<String> alreadyUploadedLogFiles) {
this.rootLogDirs = new ArrayList<String>(rootLogDirs); this.rootLogDirs = new ArrayList<String>(rootLogDirs);
this.containerId = containerId; this.containerId = containerId;
this.user = user; this.user = user;
// Ensure logs are processed in lexical order // Ensure logs are processed in lexical order
Collections.sort(this.rootLogDirs); Collections.sort(this.rootLogDirs);
this.logAggregationContext = logAggregationContext;
this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
} }
public void write(DataOutputStream out) throws IOException { private Set<File> getPendingLogFilesToUploadForThisContainer() {
Set<File> pendingUploadFiles = new HashSet<File>();
for (String rootLogDir : this.rootLogDirs) { for (String rootLogDir : this.rootLogDirs) {
File appLogDir = File appLogDir =
new File(rootLogDir, new File(rootLogDir,
@ -177,61 +199,139 @@ public class AggregatedLogFormat {
continue; // ContainerDir may have been deleted by the user. continue; // ContainerDir may have been deleted by the user.
} }
// Write out log files in lexical order pendingUploadFiles
File[] logFiles = containerLogDir.listFiles(); .addAll(getPendingLogFilesToUpload(containerLogDir));
Arrays.sort(logFiles); }
for (File logFile : logFiles) { return pendingUploadFiles;
}
final long fileLength = logFile.length(); public void write(DataOutputStream out, Set<File> pendingUploadFiles)
throws IOException {
List<File> fileList = new ArrayList<File>(pendingUploadFiles);
Collections.sort(fileList);
// Write the logFile Type for (File logFile : fileList) {
out.writeUTF(logFile.getName()); final long fileLength = logFile.length();
// Write the logFile Type
out.writeUTF(logFile.getName());
// Write the log length as UTF so that it is printable // Write the log length as UTF so that it is printable
out.writeUTF(String.valueOf(fileLength)); out.writeUTF(String.valueOf(fileLength));
// Write the log itself // Write the log itself
FileInputStream in = null; FileInputStream in = null;
try { try {
in = SecureIOUtils.openForRead(logFile, getUser(), null); in = SecureIOUtils.openForRead(logFile, getUser(), null);
byte[] buf = new byte[65535]; byte[] buf = new byte[65535];
int len = 0; int len = 0;
long bytesLeft = fileLength; long bytesLeft = fileLength;
while ((len = in.read(buf)) != -1) { while ((len = in.read(buf)) != -1) {
//If buffer contents within fileLength, write //If buffer contents within fileLength, write
if (len < bytesLeft) { if (len < bytesLeft) {
out.write(buf, 0, len); out.write(buf, 0, len);
bytesLeft-=len; bytesLeft-=len;
}
//else only write contents within fileLength, then exit early
else {
out.write(buf, 0, (int)bytesLeft);
break;
}
} }
long newLength = logFile.length(); //else only write contents within fileLength, then exit early
if(fileLength < newLength) { else {
LOG.warn("Aggregated logs truncated by approximately "+ out.write(buf, 0, (int)bytesLeft);
(newLength-fileLength) +" bytes."); break;
}
} catch (IOException e) {
String message = "Error aggregating log file. Log file : "
+ logFile.getAbsolutePath() + e.getMessage();
LOG.error(message, e);
out.write(message.getBytes());
} finally {
if (in != null) {
in.close();
} }
} }
long newLength = logFile.length();
if(fileLength < newLength) {
LOG.warn("Aggregated logs truncated by approximately "+
(newLength-fileLength) +" bytes.");
}
this.uploadedFiles.add(logFile);
} catch (IOException e) {
String message = "Error aggregating log file. Log file : "
+ logFile.getAbsolutePath() + e.getMessage();
LOG.error(message, e);
out.write(message.getBytes());
} finally {
if (in != null) {
in.close();
}
} }
} }
} }
// Added for testing purpose. // Added for testing purpose.
public String getUser() { public String getUser() {
return user; return user;
} }
private Set<File> getPendingLogFilesToUpload(File containerLogDir) {
Set<File> candidates =
new HashSet<File>(Arrays.asList(containerLogDir.listFiles()));
for (File logFile : candidates) {
this.allExistingFileMeta.add(getLogFileMetaData(logFile));
}
if (this.logAggregationContext != null && candidates.size() > 0) {
if (this.logAggregationContext.getIncludePattern() != null
&& !this.logAggregationContext.getIncludePattern().isEmpty()) {
filterFiles(this.logAggregationContext.getIncludePattern(),
candidates, false);
}
if (this.logAggregationContext.getExcludePattern() != null
&& !this.logAggregationContext.getExcludePattern().isEmpty()) {
filterFiles(this.logAggregationContext.getExcludePattern(),
candidates, true);
}
Iterable<File> mask =
Iterables.filter(candidates, new Predicate<File>() {
@Override
public boolean apply(File next) {
return !alreadyUploadedLogFiles
.contains(getLogFileMetaData(next));
}
});
candidates = Sets.newHashSet(mask);
}
return candidates;
}
private void filterFiles(String pattern, Set<File> candidates,
boolean exclusion) {
Pattern filterPattern =
Pattern.compile(pattern);
for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr
.hasNext();) {
File candidate = candidatesItr.next();
boolean match = filterPattern.matcher(candidate.getName()).find();
if ((!match && !exclusion) || (match && exclusion)) {
candidatesItr.remove();
}
}
}
public Set<Path> getCurrentUpLoadedFilesPath() {
Set<Path> path = new HashSet<Path>();
for (File file : this.uploadedFiles) {
path.add(new Path(file.getAbsolutePath()));
}
return path;
}
public Set<String> getCurrentUpLoadedFileMeta() {
Set<String> info = new HashSet<String>();
for (File file : this.uploadedFiles) {
info.add(getLogFileMetaData(file));
}
return info;
}
public Set<String> getAllExistingFilesMeta() {
return this.allExistingFileMeta;
}
private String getLogFileMetaData(File file) {
return containerId.toString() + "_" + file.getName() + "_"
+ file.lastModified();
}
} }
/** /**
@ -242,6 +342,7 @@ public class AggregatedLogFormat {
private final FSDataOutputStream fsDataOStream; private final FSDataOutputStream fsDataOStream;
private final TFile.Writer writer; private final TFile.Writer writer;
private FileContext fc;
public LogWriter(final Configuration conf, final Path remoteAppLogFile, public LogWriter(final Configuration conf, final Path remoteAppLogFile,
UserGroupInformation userUgi) throws IOException { UserGroupInformation userUgi) throws IOException {
@ -250,7 +351,7 @@ public class AggregatedLogFormat {
userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() { userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
@Override @Override
public FSDataOutputStream run() throws Exception { public FSDataOutputStream run() throws Exception {
FileContext fc = FileContext.getFileContext(conf); fc = FileContext.getFileContext(conf);
fc.setUMask(APP_LOG_FILE_UMASK); fc.setUMask(APP_LOG_FILE_UMASK);
return fc.create( return fc.create(
remoteAppLogFile, remoteAppLogFile,
@ -304,11 +405,16 @@ public class AggregatedLogFormat {
} }
public void append(LogKey logKey, LogValue logValue) throws IOException { public void append(LogKey logKey, LogValue logValue) throws IOException {
Set<File> pendingUploadFiles =
logValue.getPendingLogFilesToUploadForThisContainer();
if (pendingUploadFiles.size() == 0) {
return;
}
DataOutputStream out = this.writer.prepareAppendKey(-1); DataOutputStream out = this.writer.prepareAppendKey(-1);
logKey.write(out); logKey.write(out);
out.close(); out.close();
out = this.writer.prepareAppendValue(-1); out = this.writer.prepareAppendValue(-1);
logValue.write(out); logValue.write(out, pendingUploadFiles);
out.close(); out.close();
} }
@ -318,11 +424,7 @@ public class AggregatedLogFormat {
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Exception closing writer", e); LOG.warn("Exception closing writer", e);
} }
try { IOUtils.closeStream(fsDataOStream);
this.fsDataOStream.close();
} catch (IOException e) {
LOG.warn("Exception closing output-stream", e);
}
} }
} }

View File

@ -25,9 +25,13 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import com.google.common.annotations.VisibleForTesting;
@Private @Private
public class LogAggregationUtils { public class LogAggregationUtils {
public static final String TMP_FILE_SUFFIX = ".tmp";
/** /**
* Constructs the full filename for an application's log file per node. * Constructs the full filename for an application's log file per node.
* @param remoteRootLogDir * @param remoteRootLogDir
@ -102,8 +106,8 @@ public class LogAggregationUtils {
* @param nodeId * @param nodeId
* @return the node string to be used to construct the file name. * @return the node string to be used to construct the file name.
*/ */
private static String getNodeString(NodeId nodeId) { @VisibleForTesting
public static String getNodeString(NodeId nodeId) {
return nodeId.toString().replace(":", "_"); return nodeId.toString().replace(":", "_");
} }
} }

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.webapp.log.AggregatedLogsBlockForTest;
import org.apache.hadoop.yarn.webapp.view.BlockForTest; import org.apache.hadoop.yarn.webapp.view.BlockForTest;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock; import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import org.apache.hadoop.yarn.webapp.view.HtmlBlockForTest; import org.apache.hadoop.yarn.webapp.view.HtmlBlockForTest;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
@ -148,9 +149,10 @@ public class TestAggregatedLogsBlock {
} }
/** /**
* Log files was deleted. * Log files was deleted.
* * TODO: YARN-2582: fix log web ui for Long Running application
* @throws Exception * @throws Exception
*/ */
@Ignore
@Test @Test
public void testNoLogs() throws Exception { public void testNoLogs() throws Exception {

View File

@ -20,14 +20,18 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -36,24 +40,31 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
public class AppLogAggregatorImpl implements AppLogAggregator { public class AppLogAggregatorImpl implements AppLogAggregator {
private static final Log LOG = LogFactory private static final Log LOG = LogFactory
.getLog(AppLogAggregatorImpl.class); .getLog(AppLogAggregatorImpl.class);
private static final int THREAD_SLEEP_TIME = 1000; private static final int THREAD_SLEEP_TIME = 1000;
private static final String TMP_FILE_SUFFIX = ".tmp";
private final LocalDirsHandlerService dirsHandler; private final LocalDirsHandlerService dirsHandler;
private final Dispatcher dispatcher; private final Dispatcher dispatcher;
@ -72,15 +83,20 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private final AtomicBoolean appAggregationFinished = new AtomicBoolean(); private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
private final AtomicBoolean aborted = new AtomicBoolean(); private final AtomicBoolean aborted = new AtomicBoolean();
private final Map<ApplicationAccessType, String> appAcls; private final Map<ApplicationAccessType, String> appAcls;
private final LogAggregationContext logAggregationContext;
private final Context context;
private LogWriter writer = null; private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators =
new HashMap<ContainerId, ContainerLogAggregator>();
public AppLogAggregatorImpl(Dispatcher dispatcher, public AppLogAggregatorImpl(Dispatcher dispatcher,
DeletionService deletionService, Configuration conf, ApplicationId appId, DeletionService deletionService, Configuration conf,
UserGroupInformation userUgi, LocalDirsHandlerService dirsHandler, ApplicationId appId, UserGroupInformation userUgi,
Path remoteNodeLogFileForApp, LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
ContainerLogsRetentionPolicy retentionPolicy, ContainerLogsRetentionPolicy retentionPolicy,
Map<ApplicationAccessType, String> appAcls) { Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext,
Context context) {
this.dispatcher = dispatcher; this.dispatcher = dispatcher;
this.conf = conf; this.conf = conf;
this.delService = deletionService; this.delService = deletionService;
@ -93,45 +109,112 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
this.retentionPolicy = retentionPolicy; this.retentionPolicy = retentionPolicy;
this.pendingContainers = new LinkedBlockingQueue<ContainerId>(); this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
this.appAcls = appAcls; this.appAcls = appAcls;
this.logAggregationContext = logAggregationContext;
this.context = context;
} }
private void uploadLogsForContainer(ContainerId containerId) { private void uploadLogsForContainers() {
if (this.logAggregationDisabled) { if (this.logAggregationDisabled) {
return; return;
} }
// Lazy creation of the writer // Create a set of Containers whose logs will be uploaded in this cycle.
if (this.writer == null) { // It includes:
LOG.info("Starting aggregate log-file for app " + this.applicationId // a) all containers in pendingContainers: those containers are finished
+ " at " + this.remoteNodeTmpLogFileForApp); // and satisfy the retentionPolicy.
try { // b) some set of running containers: For all the Running containers,
this.writer = // we have ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp, // so simply set wasContainerSuccessful as true to
this.userUgi); // bypass FAILED_CONTAINERS check and find the running containers
//Write ACLs once when and if the writer is created. // which satisfy the retentionPolicy.
this.writer.writeApplicationACLs(appAcls); Set<ContainerId> pendingContainerInThisCycle = new HashSet<ContainerId>();
this.writer.writeApplicationOwner(this.userUgi.getShortUserName()); this.pendingContainers.drainTo(pendingContainerInThisCycle);
} catch (IOException e) { Set<ContainerId> finishedContainers =
LOG.error("Cannot create writer for app " + this.applicationId new HashSet<ContainerId>(pendingContainerInThisCycle);
+ ". Disabling log-aggregation for this app.", e); if (this.context.getApplications().get(this.appId) != null) {
this.logAggregationDisabled = true; for (ContainerId container : this.context.getApplications()
return; .get(this.appId).getContainers().keySet()) {
if (shouldUploadLogs(container, true)) {
pendingContainerInThisCycle.add(container);
}
} }
} }
LOG.info("Uploading logs for container " + containerId LogWriter writer = null;
+ ". Current good log dirs are "
+ StringUtils.join(",", dirsHandler.getLogDirs()));
LogKey logKey = new LogKey(containerId);
LogValue logValue =
new LogValue(dirsHandler.getLogDirs(), containerId,
userUgi.getShortUserName());
try { try {
this.writer.append(logKey, logValue); try {
} catch (IOException e) { writer =
LOG.error("Couldn't upload logs for " + containerId new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
+ ". Skipping this container."); this.userUgi);
// Write ACLs once when the writer is created.
writer.writeApplicationACLs(appAcls);
writer.writeApplicationOwner(this.userUgi.getShortUserName());
} catch (IOException e1) {
LOG.error("Cannot create writer for app " + this.applicationId
+ ". Skip log upload this time. ");
return;
}
boolean uploadedLogsInThisCycle = false;
for (ContainerId container : pendingContainerInThisCycle) {
ContainerLogAggregator aggregator = null;
if (containerLogAggregators.containsKey(container)) {
aggregator = containerLogAggregators.get(container);
} else {
aggregator = new ContainerLogAggregator(container);
containerLogAggregators.put(container, aggregator);
}
Set<Path> uploadedFilePathsInThisCycle =
aggregator.doContainerLogAggregation(writer);
if (uploadedFilePathsInThisCycle.size() > 0) {
uploadedLogsInThisCycle = true;
}
this.delService.delete(this.userUgi.getShortUserName(), null,
uploadedFilePathsInThisCycle
.toArray(new Path[uploadedFilePathsInThisCycle.size()]));
// This container is finished, and all its logs have been uploaded,
// remove it from containerLogAggregators.
if (finishedContainers.contains(container)) {
containerLogAggregators.remove(container);
}
}
if (writer != null) {
writer.close();
}
final Path renamedPath = logAggregationContext == null ||
logAggregationContext.getRollingIntervalSeconds() <= 0
? remoteNodeLogFileForApp : new Path(
remoteNodeLogFileForApp.getParent(),
remoteNodeLogFileForApp.getName() + "_"
+ System.currentTimeMillis());
final boolean rename = uploadedLogsInThisCycle;
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
FileSystem remoteFS = FileSystem.get(conf);
if (remoteFS.exists(remoteNodeTmpLogFileForApp)
&& rename) {
remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
}
return null;
}
});
} catch (Exception e) {
LOG.error(
"Failed to move temporary log file to final location: ["
+ remoteNodeTmpLogFileForApp + "] to ["
+ renamedPath + "]", e);
}
} finally {
if (writer != null) {
writer.close();
}
} }
} }
@ -149,12 +232,19 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void doAppLogAggregation() { private void doAppLogAggregation() {
ContainerId containerId;
while (!this.appFinishing.get() && !this.aborted.get()) { while (!this.appFinishing.get() && !this.aborted.get()) {
synchronized(this) { synchronized(this) {
try { try {
wait(THREAD_SLEEP_TIME); if (this.logAggregationContext != null && this.logAggregationContext
.getRollingIntervalSeconds() > 0) {
wait(this.logAggregationContext.getRollingIntervalSeconds() * 1000);
if (this.appFinishing.get() || this.aborted.get()) {
break;
}
uploadLogsForContainers();
} else {
wait(THREAD_SLEEP_TIME);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("PendingContainers queue is interrupted"); LOG.warn("PendingContainers queue is interrupted");
this.appFinishing.set(true); this.appFinishing.set(true);
@ -166,10 +256,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
return; return;
} }
// Application is finished. Finish pending-containers // App is finished, upload the container logs.
while ((containerId = this.pendingContainers.poll()) != null) { uploadLogsForContainers();
uploadLogsForContainer(containerId);
}
// Remove the local app-log-dirs // Remove the local app-log-dirs
List<String> rootLogDirs = dirsHandler.getLogDirs(); List<String> rootLogDirs = dirsHandler.getLogDirs();
@ -181,26 +269,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
} }
this.delService.delete(this.userUgi.getShortUserName(), null, this.delService.delete(this.userUgi.getShortUserName(), null,
localAppLogDirs); localAppLogDirs);
if (this.writer != null) {
this.writer.close();
LOG.info("Finished aggregate log-file for app " + this.applicationId);
}
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
FileSystem remoteFS = FileSystem.get(conf);
remoteFS.rename(remoteNodeTmpLogFileForApp, remoteNodeLogFileForApp);
return null;
}
});
} catch (Exception e) {
LOG.error("Failed to move temporary log file to final location: ["
+ remoteNodeTmpLogFileForApp + "] to [" + remoteNodeLogFileForApp
+ "]", e);
}
this.dispatcher.getEventHandler().handle( this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.appId, new ApplicationEvent(this.appId,
@ -210,9 +278,11 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private Path getRemoteNodeTmpLogFileForApp() { private Path getRemoteNodeTmpLogFileForApp() {
return new Path(remoteNodeLogFileForApp.getParent(), return new Path(remoteNodeLogFileForApp.getParent(),
(remoteNodeLogFileForApp.getName() + TMP_FILE_SUFFIX)); (remoteNodeLogFileForApp.getName() + LogAggregationUtils.TMP_FILE_SUFFIX));
} }
// TODO: The condition: containerId.getId() == 1 to determine an AM container
// is not always true.
private boolean shouldUploadLogs(ContainerId containerId, private boolean shouldUploadLogs(ContainerId containerId,
boolean wasContainerSuccessful) { boolean wasContainerSuccessful) {
@ -267,4 +337,53 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
this.aborted.set(true); this.aborted.set(true);
this.notifyAll(); this.notifyAll();
} }
@Private
@VisibleForTesting
public synchronized void doLogAggregationOutOfBand() {
LOG.info("Do OutOfBand log aggregation");
this.notifyAll();
}
private class ContainerLogAggregator {
private final ContainerId containerId;
private Set<String> uploadedFileMeta =
new HashSet<String>();
public ContainerLogAggregator(ContainerId containerId) {
this.containerId = containerId;
}
public Set<Path> doContainerLogAggregation(LogWriter writer) {
LOG.info("Uploading logs for container " + containerId
+ ". Current good log dirs are "
+ StringUtils.join(",", dirsHandler.getLogDirs()));
final LogKey logKey = new LogKey(containerId);
final LogValue logValue =
new LogValue(dirsHandler.getLogDirs(), containerId,
userUgi.getShortUserName(), logAggregationContext,
this.uploadedFileMeta);
try {
writer.append(logKey, logValue);
} catch (Exception e) {
LOG.error("Couldn't upload logs for " + containerId
+ ". Skipping this container.");
return new HashSet<Path>();
}
this.uploadedFileMeta.addAll(logValue
.getCurrentUpLoadedFileMeta());
// if any of the previous uploaded logs have been deleted,
// we need to remove them from alreadyUploadedLogs
Iterable<String> mask =
Iterables.filter(uploadedFileMeta, new Predicate<String>() {
@Override
public boolean apply(String next) {
return logValue.getAllExistingFilesMeta().contains(next);
}
});
this.uploadedFileMeta = Sets.newHashSet(mask);
return logValue.getCurrentUpLoadedFilesPath();
}
}
} }

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
@ -58,7 +59,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class LogAggregationService extends AbstractService implements public class LogAggregationService extends AbstractService implements
@ -223,6 +224,11 @@ public class LogAggregationService extends AbstractService implements
this.remoteRootLogDirSuffix); this.remoteRootLogDirSuffix);
} }
Path getRemoteAppLogDir(ApplicationId appId, String user) {
return LogAggregationUtils.getRemoteAppLogDir(this.remoteRootLogDir, appId,
user, this.remoteRootLogDirSuffix);
}
private void createDir(FileSystem fs, Path path, FsPermission fsPerm) private void createDir(FileSystem fs, Path path, FsPermission fsPerm)
throws IOException { throws IOException {
FsPermission dirPerm = new FsPermission(fsPerm); FsPermission dirPerm = new FsPermission(fsPerm);
@ -287,6 +293,7 @@ public class LogAggregationService extends AbstractService implements
createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
} }
} catch (IOException e) { } catch (IOException e) {
LOG.error("Failed to setup application log directory for " LOG.error("Failed to setup application log directory for "
+ appId, e); + appId, e);
@ -303,11 +310,13 @@ public class LogAggregationService extends AbstractService implements
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void initApp(final ApplicationId appId, String user, private void initApp(final ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy, Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
Map<ApplicationAccessType, String> appAcls) { Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) {
ApplicationEvent eventResponse; ApplicationEvent eventResponse;
try { try {
verifyAndCreateRemoteLogDir(getConfig()); verifyAndCreateRemoteLogDir(getConfig());
initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls); initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls,
logAggregationContext);
eventResponse = new ApplicationEvent(appId, eventResponse = new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED); ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
} catch (YarnRuntimeException e) { } catch (YarnRuntimeException e) {
@ -320,7 +329,8 @@ public class LogAggregationService extends AbstractService implements
protected void initAppAggregator(final ApplicationId appId, String user, protected void initAppAggregator(final ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy, Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
Map<ApplicationAccessType, String> appAcls) { Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) {
// Get user's FileSystem credentials // Get user's FileSystem credentials
final UserGroupInformation userUgi = final UserGroupInformation userUgi =
@ -334,7 +344,7 @@ public class LogAggregationService extends AbstractService implements
new AppLogAggregatorImpl(this.dispatcher, this.deletionService, new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
getConfig(), appId, userUgi, dirsHandler, getConfig(), appId, userUgi, dirsHandler,
getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
appAcls); appAcls, logAggregationContext, this.context);
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
throw new YarnRuntimeException("Duplicate initApp for " + appId); throw new YarnRuntimeException("Duplicate initApp for " + appId);
} }
@ -421,7 +431,8 @@ public class LogAggregationService extends AbstractService implements
initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(), initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
appStartEvent.getCredentials(), appStartEvent.getCredentials(),
appStartEvent.getLogRetentionPolicy(), appStartEvent.getLogRetentionPolicy(),
appStartEvent.getApplicationAcls()); appStartEvent.getApplicationAcls(),
appStartEvent.getLogAggregationContext());
break; break;
case CONTAINER_FINISHED: case CONTAINER_FINISHED:
LogHandlerContainerFinishedEvent containerFinishEvent = LogHandlerContainerFinishedEvent containerFinishEvent =
@ -439,4 +450,14 @@ public class LogAggregationService extends AbstractService implements
} }
} }
@VisibleForTesting
public ConcurrentMap<ApplicationId, AppLogAggregator> getAppLogAggregators() {
return this.appLogAggregators;
}
@VisibleForTesting
public NodeId getNodeId() {
return this.nodeId;
}
} }

View File

@ -37,6 +37,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.EOFException; import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter; import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
@ -50,14 +51,18 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.junit.Assert; import org.junit.Assert;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
@ -73,6 +78,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -85,29 +91,32 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mortbay.util.MultiException; import org.mortbay.util.MultiException;
//@Ignore //@Ignore
public class TestLogAggregationService extends BaseContainerManagerTest { public class TestLogAggregationService extends BaseContainerManagerTest {
@ -178,7 +187,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
BuilderUtils.newApplicationAttemptId(application1, 1); BuilderUtils.newApplicationAttemptId(application1, 1);
ContainerId container11 = BuilderUtils.newContainerId(appAttemptId, 1); ContainerId container11 = BuilderUtils.newContainerId(appAttemptId, 1);
// Simulate log-file creation // Simulate log-file creation
writeContainerLogs(app1LogDir, container11); writeContainerLogs(app1LogDir, container11, new String[] { "stdout",
"stderr", "syslog" });
logAggregationService.handle( logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container11, 0)); new LogHandlerContainerFinishedEvent(container11, 0));
@ -206,6 +216,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
Path logFilePath = Path logFilePath =
logAggregationService.getRemoteNodeLogFileForApp(application1, logAggregationService.getRemoteNodeLogFileForApp(application1,
this.user); this.user);
Assert.assertTrue("Log file [" + logFilePath + "] not found", new File( Assert.assertTrue("Log file [" + logFilePath + "] not found", new File(
logFilePath.toUri().getPath()).exists()); logFilePath.toUri().getPath()).exists());
@ -261,7 +272,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
Assert.assertFalse(new File(logAggregationService Assert.assertFalse(new File(logAggregationService
.getRemoteNodeLogFileForApp(application1, this.user).toUri().getPath()) .getRemoteNodeLogFileForApp(application1, this.user).toUri().getPath())
.exists()); .exists());
dispatcher.await(); dispatcher.await();
ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
@ -283,7 +294,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath()); this.remoteRootLogDir.getAbsolutePath());
String[] fileNames = new String[] { "stdout", "stderr", "syslog" };
DrainDispatcher dispatcher = createDispatcher(); DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class); EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler); dispatcher.register(ApplicationEventType.class, appEventHandler);
@ -310,7 +321,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1); ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
// Simulate log-file creation // Simulate log-file creation
writeContainerLogs(app1LogDir, container11); writeContainerLogs(app1LogDir, container11, fileNames);
logAggregationService.handle( logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container11, 0)); new LogHandlerContainerFinishedEvent(container11, 0));
@ -328,13 +339,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
ContainerId container21 = BuilderUtils.newContainerId(appAttemptId2, 1); ContainerId container21 = BuilderUtils.newContainerId(appAttemptId2, 1);
writeContainerLogs(app2LogDir, container21); writeContainerLogs(app2LogDir, container21, fileNames);
logAggregationService.handle( logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container21, 0)); new LogHandlerContainerFinishedEvent(container21, 0));
ContainerId container12 = BuilderUtils.newContainerId(appAttemptId1, 2); ContainerId container12 = BuilderUtils.newContainerId(appAttemptId1, 2);
writeContainerLogs(app1LogDir, container12); writeContainerLogs(app1LogDir, container12, fileNames);
logAggregationService.handle( logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container12, 0)); new LogHandlerContainerFinishedEvent(container12, 0));
@ -365,22 +376,22 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
reset(appEventHandler); reset(appEventHandler);
ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1); ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1);
writeContainerLogs(app3LogDir, container31); writeContainerLogs(app3LogDir, container31, fileNames);
logAggregationService.handle( logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container31, 0)); new LogHandlerContainerFinishedEvent(container31, 0));
ContainerId container32 = BuilderUtils.newContainerId(appAttemptId3, 2); ContainerId container32 = BuilderUtils.newContainerId(appAttemptId3, 2);
writeContainerLogs(app3LogDir, container32); writeContainerLogs(app3LogDir, container32, fileNames);
logAggregationService.handle( logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container32, 1)); // Failed new LogHandlerContainerFinishedEvent(container32, 1)); // Failed
ContainerId container22 = BuilderUtils.newContainerId(appAttemptId2, 2); ContainerId container22 = BuilderUtils.newContainerId(appAttemptId2, 2);
writeContainerLogs(app2LogDir, container22); writeContainerLogs(app2LogDir, container22, fileNames);
logAggregationService.handle( logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container22, 0)); new LogHandlerContainerFinishedEvent(container22, 0));
ContainerId container33 = BuilderUtils.newContainerId(appAttemptId3, 3); ContainerId container33 = BuilderUtils.newContainerId(appAttemptId3, 3);
writeContainerLogs(app3LogDir, container33); writeContainerLogs(app3LogDir, container33, fileNames);
logAggregationService.handle( logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container33, 0)); new LogHandlerContainerFinishedEvent(container33, 0));
@ -395,11 +406,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
assertEquals(0, logAggregationService.getNumAggregators()); assertEquals(0, logAggregationService.getNumAggregators());
verifyContainerLogs(logAggregationService, application1, verifyContainerLogs(logAggregationService, application1,
new ContainerId[] { container11, container12 }); new ContainerId[] { container11, container12 }, fileNames, 3, false);
verifyContainerLogs(logAggregationService, application2, verifyContainerLogs(logAggregationService, application2,
new ContainerId[] { container21 }); new ContainerId[] { container21 }, fileNames, 3, false);
verifyContainerLogs(logAggregationService, application3, verifyContainerLogs(logAggregationService, application3,
new ContainerId[] { container31, container32 }); new ContainerId[] { container31, container32 }, fileNames, 3, false);
dispatcher.await(); dispatcher.await();
@ -591,7 +604,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
doThrow(new YarnRuntimeException("KABOOM!")) doThrow(new YarnRuntimeException("KABOOM!"))
.when(logAggregationService).initAppAggregator( .when(logAggregationService).initAppAggregator(
eq(appId), eq(user), any(Credentials.class), eq(appId), eq(user), any(Credentials.class),
any(ContainerLogsRetentionPolicy.class), anyMap()); any(ContainerLogsRetentionPolicy.class), anyMap(),
any(LogAggregationContext.class));
logAggregationService.handle(new LogHandlerAppStartedEvent(appId, logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
this.user, null, this.user, null,
@ -672,26 +686,62 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
assertEquals(0, logAggregationService.getNumAggregators()); assertEquals(0, logAggregationService.getNumAggregators());
} }
private void writeContainerLogs(File appLogDir, ContainerId containerId) private void writeContainerLogs(File appLogDir, ContainerId containerId,
throws IOException { String[] fileName) throws IOException {
// ContainerLogDir should be created // ContainerLogDir should be created
String containerStr = ConverterUtils.toString(containerId); String containerStr = ConverterUtils.toString(containerId);
File containerLogDir = new File(appLogDir, containerStr); File containerLogDir = new File(appLogDir, containerStr);
containerLogDir.mkdir(); containerLogDir.mkdir();
for (String fileType : new String[] { "stdout", "stderr", "syslog" }) { for (String fileType : fileName) {
Writer writer11 = new FileWriter(new File(containerLogDir, fileType)); Writer writer11 = new FileWriter(new File(containerLogDir, fileType));
writer11.write(containerStr + " Hello " + fileType + "!"); writer11.write(containerStr + " Hello " + fileType + "!");
writer11.close(); writer11.close();
} }
} }
private void verifyContainerLogs( private void verifyContainerLogs(LogAggregationService logAggregationService,
LogAggregationService logAggregationService, ApplicationId appId, ApplicationId appId, ContainerId[] expectedContainerIds,
ContainerId[] expectedContainerIds) throws IOException { String[] logFiles, int numOfContainerLogs, boolean multiLogs)
throws IOException {
Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user);
RemoteIterator<FileStatus> nodeFiles = null;
try {
Path qualifiedLogDir =
FileContext.getFileContext(this.conf).makeQualified(appLogDir);
nodeFiles =
FileContext.getFileContext(qualifiedLogDir.toUri(), this.conf)
.listStatus(appLogDir);
} catch (FileNotFoundException fnf) {
Assert.fail("Should have log files");
}
Assert.assertTrue(nodeFiles.hasNext());
FileStatus targetNodeFile = null;
if (! multiLogs) {
targetNodeFile = nodeFiles.next();
Assert.assertTrue(targetNodeFile.getPath().getName().equals(
LogAggregationUtils.getNodeString(logAggregationService.getNodeId())));
} else {
long fileCreateTime = 0;
while (nodeFiles.hasNext()) {
FileStatus nodeFile = nodeFiles.next();
if (!nodeFile.getPath().getName()
.contains(LogAggregationUtils.TMP_FILE_SUFFIX)) {
long time =
Long.parseLong(nodeFile.getPath().getName().split("_")[2]);
if (time > fileCreateTime) {
targetNodeFile = nodeFile;
fileCreateTime = time;
}
}
}
String[] fileName = targetNodeFile.getPath().getName().split("_");
Assert.assertTrue(fileName.length == 3);
Assert.assertEquals(fileName[0] + ":" + fileName[1],
logAggregationService.getNodeId().toString());
}
AggregatedLogFormat.LogReader reader = AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(this.conf, new AggregatedLogFormat.LogReader(this.conf, targetNodeFile.getPath());
logAggregationService.getRemoteNodeLogFileForApp(appId, this.user));
Assert.assertEquals(this.user, reader.getApplicationOwner()); Assert.assertEquals(this.user, reader.getApplicationOwner());
verifyAcls(reader.getApplicationAcls()); verifyAcls(reader.getApplicationAcls());
@ -749,8 +799,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
for (ContainerId cId : expectedContainerIds) { for (ContainerId cId : expectedContainerIds) {
String containerStr = ConverterUtils.toString(cId); String containerStr = ConverterUtils.toString(cId);
Map<String, String> thisContainerMap = logMap.remove(containerStr); Map<String, String> thisContainerMap = logMap.remove(containerStr);
Assert.assertEquals(3, thisContainerMap.size()); Assert.assertEquals(numOfContainerLogs, thisContainerMap.size());
for (String fileType : new String[] { "stdout", "stderr", "syslog" }) { for (String fileType : logFiles) {
String expectedValue = containerStr + " Hello " + fileType + "!"; String expectedValue = containerStr + " Hello " + fileType + "!";
LOG.info("Expected log-content : " + new String(expectedValue)); LOG.info("Expected log-content : " + new String(expectedValue));
String foundValue = thisContainerMap.remove(fileType); String foundValue = thisContainerMap.remove(fileType);
@ -987,4 +1037,331 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
sb.append("]"); sb.append("]");
return sb.toString(); return sb.toString();
} }
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testLogAggregationServiceWithPatterns() throws Exception {
LogAggregationContext logAggregationContextWithIncludePatterns =
Records.newRecord(LogAggregationContext.class);
String includePattern = "stdout|syslog";
logAggregationContextWithIncludePatterns.setIncludePattern(includePattern);
LogAggregationContext LogAggregationContextWithExcludePatterns =
Records.newRecord(LogAggregationContext.class);
String excludePattern = "stdout|syslog";
LogAggregationContextWithExcludePatterns.setExcludePattern(excludePattern);
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2);
ApplicationId application3 = BuilderUtils.newApplicationId(1234, 3);
ApplicationId application4 = BuilderUtils.newApplicationId(1234, 4);
Application mockApp = mock(Application.class);
when(mockApp.getContainers()).thenReturn(
new HashMap<ContainerId, Container>());
this.context.getApplications().put(application1, mockApp);
this.context.getApplications().put(application2, mockApp);
this.context.getApplications().put(application3, mockApp);
this.context.getApplications().put(application4, mockApp);
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler);
logAggregationService.init(this.conf);
logAggregationService.start();
// LogContext for application1 has includePatten which includes
// stdout and syslog.
// After logAggregation is finished, we expect the logs for application1
// has only logs from stdout and syslog
// AppLogDir should be created
File appLogDir1 =
new File(localLogDir, ConverterUtils.toString(application1));
appLogDir1.mkdir();
logAggregationService.handle(new LogHandlerAppStartedEvent(application1,
this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
logAggregationContextWithIncludePatterns));
ApplicationAttemptId appAttemptId1 =
BuilderUtils.newApplicationAttemptId(application1, 1);
ContainerId container1 = BuilderUtils.newContainerId(appAttemptId1, 1);
// Simulate log-file creation
writeContainerLogs(appLogDir1, container1, new String[] { "stdout",
"stderr", "syslog" });
logAggregationService.handle(new LogHandlerContainerFinishedEvent(
container1, 0));
// LogContext for application2 has excludePatten which includes
// stdout and syslog.
// After logAggregation is finished, we expect the logs for application2
// has only logs from stderr
ApplicationAttemptId appAttemptId2 =
BuilderUtils.newApplicationAttemptId(application2, 1);
File app2LogDir =
new File(localLogDir, ConverterUtils.toString(application2));
app2LogDir.mkdir();
logAggregationService.handle(new LogHandlerAppStartedEvent(application2,
this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY,
this.acls, LogAggregationContextWithExcludePatterns));
ContainerId container2 = BuilderUtils.newContainerId(appAttemptId2, 1);
writeContainerLogs(app2LogDir, container2, new String[] { "stdout",
"stderr", "syslog" });
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container2, 0));
// LogContext for application3 has includePattern which is *.log and
// excludePatten which includes std.log and sys.log.
// After logAggregation is finished, we expect the logs for application3
// has all logs whose suffix is .log but excluding sys.log and std.log
LogAggregationContext context1 =
Records.newRecord(LogAggregationContext.class);
context1.setIncludePattern(".*.log");
context1.setExcludePattern("sys.log|std.log");
ApplicationAttemptId appAttemptId3 =
BuilderUtils.newApplicationAttemptId(application3, 1);
File app3LogDir =
new File(localLogDir, ConverterUtils.toString(application3));
app3LogDir.mkdir();
logAggregationService.handle(new LogHandlerAppStartedEvent(application3,
this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY,
this.acls, context1));
ContainerId container3 = BuilderUtils.newContainerId(appAttemptId3, 1);
writeContainerLogs(app3LogDir, container3, new String[] { "stdout",
"sys.log", "std.log", "out.log", "err.log", "log" });
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container3, 0));
// LogContext for application4 has includePattern
// which includes std.log and sys.log and
// excludePatten which includes std.log.
// After logAggregation is finished, we expect the logs for application4
// only has sys.log
LogAggregationContext context2 =
Records.newRecord(LogAggregationContext.class);
context2.setIncludePattern("sys.log|std.log");
context2.setExcludePattern("std.log");
ApplicationAttemptId appAttemptId4 =
BuilderUtils.newApplicationAttemptId(application4, 1);
File app4LogDir =
new File(localLogDir, ConverterUtils.toString(application4));
app4LogDir.mkdir();
logAggregationService.handle(new LogHandlerAppStartedEvent(application4,
this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY,
this.acls, context2));
ContainerId container4 = BuilderUtils.newContainerId(appAttemptId4, 1);
writeContainerLogs(app4LogDir, container4, new String[] { "stdout",
"sys.log", "std.log", "out.log", "err.log", "log" });
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container4, 0));
dispatcher.await();
ApplicationEvent expectedInitEvents[] =
new ApplicationEvent[] { new ApplicationEvent(application1,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
new ApplicationEvent(application2,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
new ApplicationEvent(application3,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
new ApplicationEvent(application4,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED)};
checkEvents(appEventHandler, expectedInitEvents, false, "getType",
"getApplicationID");
reset(appEventHandler);
logAggregationService.handle(new LogHandlerAppFinishedEvent(application1));
logAggregationService.handle(new LogHandlerAppFinishedEvent(application2));
logAggregationService.handle(new LogHandlerAppFinishedEvent(application3));
logAggregationService.handle(new LogHandlerAppFinishedEvent(application4));
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
String[] logFiles = new String[] { "stdout", "syslog" };
verifyContainerLogs(logAggregationService, application1,
new ContainerId[] { container1 }, logFiles, 2, false);
logFiles = new String[] { "stderr" };
verifyContainerLogs(logAggregationService, application2,
new ContainerId[] { container2 }, logFiles, 1, false);
logFiles = new String[] { "out.log", "err.log" };
verifyContainerLogs(logAggregationService, application3,
new ContainerId[] { container3 }, logFiles, 2, false);
logFiles = new String[] { "sys.log" };
verifyContainerLogs(logAggregationService, application4,
new ContainerId[] { container4 }, logFiles, 1, false);
dispatcher.await();
ApplicationEvent[] expectedFinishedEvents =
new ApplicationEvent[] { new ApplicationEvent(application1,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED),
new ApplicationEvent(application2,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED),
new ApplicationEvent(application3,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED),
new ApplicationEvent(application4,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED) };
checkEvents(appEventHandler, expectedFinishedEvents, false, "getType",
"getApplicationID");
dispatcher.stop();
}
@SuppressWarnings("unchecked")
@Test (timeout = 50000)
public void testLogAggregationServiceWithInterval() throws Exception {
final int maxAttempts = 50;
LogAggregationContext logAggregationContextWithInterval =
Records.newRecord(LogAggregationContext.class);
logAggregationContextWithInterval.setRollingIntervalSeconds(5000);
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
// by setting this configuration, the log files will not be deleted immediately after
// they are aggregated to remote directory.
// We could use it to test whether the previous aggregated log files will be aggregated
// again in next cycle.
this.conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
ApplicationId application = BuilderUtils.newApplicationId(123456, 1);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(application, 1);
ContainerId container = BuilderUtils.newContainerId(appAttemptId, 1);
Context context = spy(this.context);
ConcurrentMap<ApplicationId, Application> maps =
new ConcurrentHashMap<ApplicationId, Application>();
Application app = mock(Application.class);
Map<ContainerId, Container> containers = new HashMap<ContainerId, Container>();
containers.put(container, mock(Container.class));
maps.put(application, app);
when(app.getContainers()).thenReturn(containers);
when(context.getApplications()).thenReturn(maps);
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, context, this.delSrvc,
super.dirsHandler);
logAggregationService.init(this.conf);
logAggregationService.start();
// AppLogDir should be created
File appLogDir =
new File(localLogDir, ConverterUtils.toString(application));
appLogDir.mkdir();
logAggregationService.handle(new LogHandlerAppStartedEvent(application,
this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
logAggregationContextWithInterval));
// Simulate log-file creation
String[] logFiles1 = new String[] { "stdout", "stderr", "syslog" };
writeContainerLogs(appLogDir, container, logFiles1);
// Do log aggregation
AppLogAggregatorImpl aggregator =
(AppLogAggregatorImpl) logAggregationService.getAppLogAggregators()
.get(application);
aggregator.doLogAggregationOutOfBand();
int count = 0;
while (numOfLogsAvailable(logAggregationService, application) != 1
&& count <= maxAttempts) {
Thread.sleep(100);
count++;
}
// Container logs should be uploaded
verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles1, 3, true);
// There is no log generated at this time. Do the log aggregation again.
aggregator.doLogAggregationOutOfBand();
// Same logs will not be aggregated again.
// Only one aggregated log file in Remote file directory.
Assert.assertEquals(numOfLogsAvailable(logAggregationService, application),
1);
// Do log aggregation
String[] logFiles2 = new String[] { "stdout_1", "stderr_1", "syslog_1" };
writeContainerLogs(appLogDir, container, logFiles2);
aggregator.doLogAggregationOutOfBand();
count = 0;
while (numOfLogsAvailable(logAggregationService, application) != 2
&& count <= maxAttempts) {
Thread.sleep(100);
count ++;
}
// Container logs should be uploaded
verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles2, 3, true);
// create another logs
String[] logFiles3 = new String[] { "stdout_2", "stderr_2", "syslog_2" };
writeContainerLogs(appLogDir, container, logFiles3);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container, 0));
dispatcher.await();
logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
count = 0;
while (numOfLogsAvailable(logAggregationService, application) != 3
&& count <= maxAttempts) {
Thread.sleep(100);
count ++;
}
verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles3, 3, true);
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
dispatcher.stop();
}
private int numOfLogsAvailable(LogAggregationService logAggregationService,
ApplicationId appId) throws IOException {
Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user);
RemoteIterator<FileStatus> nodeFiles = null;
try {
Path qualifiedLogDir =
FileContext.getFileContext(this.conf).makeQualified(appLogDir);
nodeFiles =
FileContext.getFileContext(qualifiedLogDir.toUri(), this.conf)
.listStatus(appLogDir);
} catch (FileNotFoundException fnf) {
return -1;
}
int count = 0;
while (nodeFiles.hasNext()) {
FileStatus status = nodeFiles.next();
String filename = status.getPath().getName();
if (filename.contains(LogAggregationUtils.TMP_FILE_SUFFIX)) {
return -1;
}
if (filename.contains(LogAggregationUtils
.getNodeString(logAggregationService.getNodeId()))) {
count++;
}
}
return count;
}
} }