diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 86f45b81fe3..16bd73a121e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1064,7 +1064,17 @@ public class YarnConfiguration extends Configuration { public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX + "log-aggregation-enable"; public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false; - + + public static final String LOG_AGGREGATION_FILE_FORMATS = YARN_PREFIX + + "log-aggregation.file-formats"; + public static final String LOG_AGGREGATION_FILE_CONTROLLER_FMT = + YARN_PREFIX + "log-aggregation.file-controller.%s.class"; + + public static final String LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT + = YARN_PREFIX + "log-aggregation.%s.remote-app-log-dir"; + public static final String LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT + = YARN_PREFIX + "log-aggregation.%s.remote-app-log-dir-suffix"; + /** * How long to wait before deleting aggregated logs, -1 disables. * Be careful set this too small and you will spam the name node. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index c40c2c52ea0..153a35aa896 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -184,6 +184,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { // Currently defined in RegistryConstants/core-site.xml xmlPrefixToSkipCompare.add("hadoop.registry"); + xmlPrefixToSkipCompare.add( + "yarn.log-aggregation.file-controller.TFile.class"); // Add the filters used for checking for collision of default values. initDefaultValueCollisionCheck(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index c054209630b..26e03196e39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -36,7 +36,6 @@ import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse.Status; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileWriter; @@ -78,6 +77,9 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; import org.codehaus.jettison.json.JSONObject; import org.junit.Assert; import org.junit.Before; @@ -1345,42 +1347,55 @@ public class TestLogsCLI { Path path = new Path(appDir, LogAggregationUtils.getNodeString(nodeId) + System.currentTimeMillis()); - try (AggregatedLogFormat.LogWriter writer = - new AggregatedLogFormat.LogWriter()) { - writer.initialize(configuration, path, ugi); - writer.writeApplicationOwner(ugi.getUserName()); - + LogAggregationFileControllerFactory factory + = new LogAggregationFileControllerFactory(configuration); + LogAggregationFileController fileFormat = factory + .getFileControllerForWrite(); + try { Map appAcls = new HashMap<>(); appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); - writer.writeApplicationACLs(appAcls); - writer.append(new AggregatedLogFormat.LogKey(containerId), + LogAggregationFileControllerContext context + = new LogAggregationFileControllerContext( + path, path, true, 1000, + containerId.getApplicationAttemptId().getApplicationId(), + appAcls, nodeId, ugi); + fileFormat.initializeWriter(context); + fileFormat.write(new AggregatedLogFormat.LogKey(containerId), new AggregatedLogFormat.LogValue(rootLogDirs, containerId, UserGroupInformation.getCurrentUser().getShortUserName())); + } finally { + fileFormat.closeWriter(); } } + @SuppressWarnings("static-access") private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ugi, Configuration configuration, List rootLogDirs, NodeId nodeId, ContainerId containerId, Path appDir, FileSystem fs) throws Exception { - Path path = - new Path(appDir, LogAggregationUtils.getNodeString(nodeId) - + System.currentTimeMillis()); - try (AggregatedLogFormat.LogWriter writer = - new AggregatedLogFormat.LogWriter()) { - writer.initialize(configuration, path, ugi); - writer.writeApplicationOwner(ugi.getUserName()); - + LogAggregationFileControllerFactory factory + = new LogAggregationFileControllerFactory(configuration); + LogAggregationFileController fileFormat = factory + .getFileControllerForWrite(); + try { Map appAcls = new HashMap<>(); appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); - writer.writeApplicationACLs(appAcls); - DataOutputStream out = writer.getWriter().prepareAppendKey(-1); - new AggregatedLogFormat.LogKey(containerId).write(out); - out.close(); - out = writer.getWriter().prepareAppendValue(-1); - new AggregatedLogFormat.LogValue(rootLogDirs, containerId, - UserGroupInformation.getCurrentUser().getShortUserName()).write(out, - new HashSet<>()); - out.close(); + ApplicationId appId = containerId.getApplicationAttemptId() + .getApplicationId(); + Path path = fileFormat.getRemoteNodeLogFileForApp( + appId, ugi.getCurrentUser().getShortUserName(), nodeId); + LogAggregationFileControllerContext context + = new LogAggregationFileControllerContext( + path, path, true, 1000, + appId, appAcls, nodeId, ugi); + fileFormat.initializeWriter(context); + AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey( + containerId); + AggregatedLogFormat.LogValue value = new AggregatedLogFormat.LogValue( + rootLogDirs, containerId, UserGroupInformation.getCurrentUser() + .getShortUserName()); + fileFormat.write(key, value); + } finally { + fileFormat.closeWriter(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index d806b121e0f..3c1dcdc1234 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -44,7 +44,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.regex.Pattern; - +import org.apache.commons.io.IOUtils; import org.apache.commons.io.input.BoundedInputStream; import org.apache.commons.io.output.WriterOutputStream; import org.apache.commons.logging.Log; @@ -61,7 +61,6 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SecureIOUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.file.tfile.TFile; @@ -71,7 +70,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Times; import com.google.common.annotations.VisibleForTesting; @@ -249,7 +247,7 @@ public class AggregatedLogFormat { in = secureOpenFile(logFile); } catch (IOException e) { logErrorMessage(logFile, e); - IOUtils.cleanup(LOG, in); + IOUtils.closeQuietly(in); continue; } @@ -287,7 +285,7 @@ public class AggregatedLogFormat { String message = logErrorMessage(logFile, e); out.write(message.getBytes(Charset.forName("UTF-8"))); } finally { - IOUtils.cleanup(LOG, in); + IOUtils.closeQuietly(in); } } } @@ -557,7 +555,7 @@ public class AggregatedLogFormat { } catch (Exception e) { LOG.warn("Exception closing writer", e); } finally { - IOUtils.closeStream(this.fsDataOStream); + IOUtils.closeQuietly(this.fsDataOStream); } } } @@ -605,7 +603,7 @@ public class AggregatedLogFormat { } return null; } finally { - IOUtils.cleanup(LOG, ownerScanner); + IOUtils.closeQuietly(ownerScanner); } } @@ -651,7 +649,7 @@ public class AggregatedLogFormat { } return acls; } finally { - IOUtils.cleanup(LOG, aclScanner); + IOUtils.closeQuietly(aclScanner); } } @@ -775,8 +773,8 @@ public class AggregatedLogFormat { } } } finally { - IOUtils.cleanup(LOG, ps); - IOUtils.cleanup(LOG, os); + IOUtils.closeQuietly(ps); + IOUtils.closeQuietly(os); } } @@ -1001,7 +999,9 @@ public class AggregatedLogFormat { } public void close() { - IOUtils.cleanup(LOG, scanner, reader, fsDataIStream); + IOUtils.closeQuietly(scanner); + IOUtils.closeQuietly(reader); + IOUtils.closeQuietly(fsDataIStream); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java index 24baaab0d6e..e8a28dec223 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java @@ -133,6 +133,23 @@ public class LogAggregationUtils { new org.apache.hadoop.fs.Path(conf.get( YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); + return getRemoteAppLogDir(conf, appId, appOwner, remoteRootLogDir, suffix); + } + + /** + * Return the remote application log directory. + * @param conf the configuration + * @param appId the application + * @param appOwner the application owner + * @param remoteRootLogDir the remote root log directory + * @param suffix the log directory suffix + * @return the remote application log directory path + * @throws IOException if we can not find remote application log directory + */ + public static org.apache.hadoop.fs.Path getRemoteAppLogDir( + Configuration conf, ApplicationId appId, String appOwner, + org.apache.hadoop.fs.Path remoteRootLogDir, String suffix) + throws IOException { org.apache.hadoop.fs.Path remoteAppDir = null; if (appOwner == null) { org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir = @@ -154,6 +171,30 @@ public class LogAggregationUtils { return remoteAppDir; } + /** + * Get all available log files under remote app log directory. + * @param conf the configuration + * @param appId the applicationId + * @param appOwner the application owner + * @param remoteRootLogDir the remote root log directory + * @param suffix the log directory suffix + * @return the iterator of available log files + * @throws IOException if there is no log file available + */ + public static RemoteIterator getRemoteNodeFileDir( + Configuration conf, ApplicationId appId, String appOwner, + org.apache.hadoop.fs.Path remoteRootLogDir, String suffix) + throws IOException { + Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner, + remoteRootLogDir, suffix); + RemoteIterator nodeFiles = null; + Path qualifiedLogDir = + FileContext.getFileContext(conf).makeQualified(remoteAppLogDir); + nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), + conf).listStatus(remoteAppLogDir); + return nodeFiles; + } + /** * Get all available log files under remote app log directory. * @param conf the configuration diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java new file mode 100644 index 00000000000..5503f8fee8d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -0,0 +1,404 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation.filecontroller; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; + +/** + * Base class to implement Log Aggregation File Controller. + */ +@Private +@Unstable +public abstract class LogAggregationFileController { + + private static final Log LOG = LogFactory.getLog( + LogAggregationFileController.class); + + /* + * Expected deployment TLD will be 1777, owner=, group= App dirs will be created as 770, + * owner=, group=: so that the owner and can + * access / modify the files. + * should obviously be a limited access group. + */ + /** + * Permissions for the top level directory under which app directories will be + * created. + */ + protected static final FsPermission TLDIR_PERMISSIONS = FsPermission + .createImmutable((short) 01777); + + /** + * Permissions for the Application directory. + */ + protected static final FsPermission APP_DIR_PERMISSIONS = FsPermission + .createImmutable((short) 0770); + + // This is temporary solution. The configuration will be deleted once + // we find a more scalable method to only write a single log file per LRS. + private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP + = YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app"; + private static final int + DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30; + + protected Configuration conf; + protected Path remoteRootLogDir; + protected String remoteRootLogDirSuffix; + protected int retentionSize; + protected String fileControllerName; + + public LogAggregationFileController() {} + + /** + * Initialize the log file controller. + * @param conf the Configuration + * @param controllerName the log controller class name + */ + public void initialize(Configuration conf, String controllerName) { + this.conf = conf; + int configuredRentionSize = + conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP, + DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP); + if (configuredRentionSize <= 0) { + this.retentionSize = + DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP; + } else { + this.retentionSize = configuredRentionSize; + } + this.fileControllerName = controllerName; + initInternal(conf); + } + + /** + * Derived classes initialize themselves using this method. + * @param conf the Configuration + */ + protected abstract void initInternal(Configuration conf); + + /** + * Get the remote root log directory. + * @return the remote root log directory path + */ + public Path getRemoteRootLogDir() { + return this.remoteRootLogDir; + } + + /** + * Get the log aggregation directory suffix. + * @return the log aggregation directory suffix + */ + public String getRemoteRootLogDirSuffix() { + return this.remoteRootLogDirSuffix; + } + + /** + * Initialize the writer. + * @param context the {@link LogAggregationFileControllerContext} + * @throws IOException if fails to initialize the writer + */ + public abstract void initializeWriter( + LogAggregationFileControllerContext context) throws IOException; + + /** + * Close the writer. + */ + public abstract void closeWriter(); + + /** + * Write the log content. + * @param logKey the log key + * @param logValue the log content + * @throws IOException if fails to write the logs + */ + public abstract void write(LogKey logKey, LogValue logValue) + throws IOException; + + /** + * Operations needed after write the log content. + * @param record the {@link LogAggregationFileControllerContext} + * @throws Exception if anything fails + */ + public abstract void postWrite(LogAggregationFileControllerContext record) + throws Exception; + + /** + * Verify and create the remote log directory. + */ + public void verifyAndCreateRemoteLogDir() { + boolean logPermError = true; + // Checking the existence of the TLD + FileSystem remoteFS = null; + try { + remoteFS = getFileSystem(conf); + } catch (IOException e) { + throw new YarnRuntimeException( + "Unable to get Remote FileSystem instance", e); + } + boolean remoteExists = true; + Path remoteRootLogDir = getRemoteRootLogDir(); + try { + FsPermission perms = + remoteFS.getFileStatus(remoteRootLogDir).getPermission(); + if (!perms.equals(TLDIR_PERMISSIONS) && logPermError) { + LOG.warn("Remote Root Log Dir [" + remoteRootLogDir + + "] already exist, but with incorrect permissions. " + + "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms + + "]." + " The cluster may have problems with multiple users."); + logPermError = false; + } else { + logPermError = true; + } + } catch (FileNotFoundException e) { + remoteExists = false; + } catch (IOException e) { + throw new YarnRuntimeException( + "Failed to check permissions for dir [" + + remoteRootLogDir + "]", e); + } + if (!remoteExists) { + LOG.warn("Remote Root Log Dir [" + remoteRootLogDir + + "] does not exist. Attempting to create it."); + try { + Path qualified = + remoteRootLogDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS)); + remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS)); + + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + String primaryGroupName = null; + try { + primaryGroupName = loginUser.getPrimaryGroupName(); + } catch (IOException e) { + LOG.warn("No primary group found. The remote root log directory" + + " will be created with the HDFS superuser being its group " + + "owner. JobHistoryServer may be unable to read the directory."); + } + // set owner on the remote directory only if the primary group exists + if (primaryGroupName != null) { + remoteFS.setOwner(qualified, + loginUser.getShortUserName(), primaryGroupName); + } + } catch (IOException e) { + throw new YarnRuntimeException("Failed to create remoteLogDir [" + + remoteRootLogDir + "]", e); + } + } + } + + /** + * Create remote Application directory for log aggregation. + * @param user the user + * @param appId the application ID + * @param userUgi the UGI + */ + public void createAppDir(final String user, final ApplicationId appId, + UserGroupInformation userUgi) { + final Path remoteRootLogDir = getRemoteRootLogDir(); + final String remoteRootLogDirSuffix = getRemoteRootLogDirSuffix(); + try { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + try { + // TODO: Reuse FS for user? + FileSystem remoteFS = getFileSystem(conf); + + // Only creating directories if they are missing to avoid + // unnecessary load on the filesystem from all of the nodes + Path appDir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogDir, appId, user, remoteRootLogDirSuffix); + + appDir = appDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) { + Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( + remoteRootLogDir, user, remoteRootLogDirSuffix); + suffixDir = suffixDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { + Path userDir = LogAggregationUtils.getRemoteLogUserDir( + remoteRootLogDir, user); + userDir = userDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { + createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); + } + + createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); + } + + createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); + } + + } catch (IOException e) { + LOG.error("Failed to setup application log directory for " + + appId, e); + throw e; + } + return null; + } + }); + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } + + @VisibleForTesting + protected FileSystem getFileSystem(Configuration conf) throws IOException { + return getRemoteRootLogDir().getFileSystem(conf); + } + + protected void createDir(FileSystem fs, Path path, FsPermission fsPerm) + throws IOException { + FsPermission dirPerm = new FsPermission(fsPerm); + fs.mkdirs(path, dirPerm); + FsPermission umask = FsPermission.getUMask(fs.getConf()); + if (!dirPerm.equals(dirPerm.applyUMask(umask))) { + fs.setPermission(path, new FsPermission(fsPerm)); + } + } + + protected boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm) + throws IOException { + boolean exists = true; + try { + FileStatus appDirStatus = fs.getFileStatus(path); + if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) { + fs.setPermission(path, APP_DIR_PERMISSIONS); + } + } catch (FileNotFoundException fnfe) { + exists = false; + } + return exists; + } + + /** + * Get the remote aggregated log path. + * @param appId the ApplicationId + * @param user the Application Owner + * @param nodeId the NodeManager Id + * @return the remote aggregated log path + */ + public Path getRemoteNodeLogFileForApp(ApplicationId appId, String user, + NodeId nodeId) { + return LogAggregationUtils.getRemoteNodeLogFileForApp( + getRemoteRootLogDir(), appId, user, nodeId, + getRemoteRootLogDirSuffix()); + } + + /** + * Get the remote application directory for log aggregation. + * @param appId the Application ID + * @param appOwner the Application Owner + * @return the remote application directory + * @throws IOException if can not find the remote application directory + */ + public Path getRemoteAppLogDir(ApplicationId appId, String appOwner) + throws IOException { + return LogAggregationUtils.getRemoteAppLogDir(conf, appId, appOwner, + this.remoteRootLogDir, this.remoteRootLogDirSuffix); + } + + protected void cleanOldLogs(Path remoteNodeLogFileForApp, + final NodeId nodeId, UserGroupInformation userUgi) { + try { + final FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf); + Path appDir = remoteNodeLogFileForApp.getParent().makeQualified( + remoteFS.getUri(), remoteFS.getWorkingDirectory()); + Set status = + new HashSet(Arrays.asList(remoteFS.listStatus(appDir))); + + Iterable mask = + Iterables.filter(status, new Predicate() { + @Override + public boolean apply(FileStatus next) { + return next.getPath().getName() + .contains(LogAggregationUtils.getNodeString(nodeId)) + && !next.getPath().getName().endsWith( + LogAggregationUtils.TMP_FILE_SUFFIX); + } + }); + status = Sets.newHashSet(mask); + // Normally, we just need to delete one oldest log + // before we upload a new log. + // If we can not delete the older logs in this cycle, + // we will delete them in next cycle. + if (status.size() >= this.retentionSize) { + // sort by the lastModificationTime ascending + List statusList = new ArrayList(status); + Collections.sort(statusList, new Comparator() { + public int compare(FileStatus s1, FileStatus s2) { + return s1.getModificationTime() < s2.getModificationTime() ? -1 + : s1.getModificationTime() > s2.getModificationTime() ? 1 : 0; + } + }); + for (int i = 0; i <= statusList.size() - this.retentionSize; i++) { + final FileStatus remove = statusList.get(i); + try { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + remoteFS.delete(remove.getPath(), false); + return null; + } + }); + } catch (Exception e) { + LOG.error("Failed to delete " + remove.getPath(), e); + } + } + } + } catch (Exception e) { + LOG.error("Failed to clean old logs", e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java new file mode 100644 index 00000000000..32128bc5ba0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java @@ -0,0 +1,130 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.logaggregation.filecontroller; + +import java.util.Map; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; + +/** + * {@code LogAggregationFileControllerContext} is a record used in + * the log aggregation process. + */ +@Private +@Unstable +public class LogAggregationFileControllerContext { + private final boolean logAggregationInRolling; + private final long rollingMonitorInterval; + private final Path remoteNodeLogFileForApp; + private final NodeId nodeId; + private final UserGroupInformation userUgi; + private final ApplicationId appId; + private final Path remoteNodeTmpLogFileForApp; + private final Map appAcls; + private int logAggregationTimes = 0; + private int cleanOldLogsTimes = 0; + + private boolean uploadedLogsInThisCycle; + private long logUploadedTimeStamp; + + public LogAggregationFileControllerContext(Path remoteNodeLogFileForApp, + Path remoteNodeTmpLogFileForApp, + boolean logAggregationInRolling, + long rollingMonitorInterval, + ApplicationId appId, + Map appAcls, + NodeId nodeId, UserGroupInformation userUgi) { + this.remoteNodeLogFileForApp = remoteNodeLogFileForApp; + this.remoteNodeTmpLogFileForApp = remoteNodeTmpLogFileForApp; + this.logAggregationInRolling = logAggregationInRolling; + this.rollingMonitorInterval = rollingMonitorInterval; + this.nodeId = nodeId; + this.appId = appId; + this.appAcls = appAcls; + this.userUgi = userUgi; + } + + public boolean isUploadedLogsInThisCycle() { + return uploadedLogsInThisCycle; + } + + public void setUploadedLogsInThisCycle(boolean uploadedLogsInThisCycle) { + this.uploadedLogsInThisCycle = uploadedLogsInThisCycle; + } + + public Path getRemoteNodeLogFileForApp() { + return remoteNodeLogFileForApp; + } + + public long getRollingMonitorInterval() { + return rollingMonitorInterval; + } + + public boolean isLogAggregationInRolling() { + return logAggregationInRolling; + } + + public long getLogUploadTimeStamp() { + return logUploadedTimeStamp; + } + + public void setLogUploadTimeStamp(long uploadTimeStamp) { + this.logUploadedTimeStamp = uploadTimeStamp; + } + + public NodeId getNodeId() { + return nodeId; + } + + public UserGroupInformation getUserUgi() { + return userUgi; + } + + public ApplicationId getAppId() { + return appId; + } + + public Path getRemoteNodeTmpLogFileForApp() { + return remoteNodeTmpLogFileForApp; + } + + public void increLogAggregationTimes() { + this.logAggregationTimes++; + } + + public void increcleanupOldLogTimes() { + this.cleanOldLogsTimes++; + } + + public int getLogAggregationTimes() { + return logAggregationTimes; + } + + public int getCleanOldLogsTimes() { + return cleanOldLogsTimes; + } + + public Map getAppAcls() { + return appAcls; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java new file mode 100644 index 00000000000..746bf5a4c11 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation.filecontroller; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * Use {@code LogAggregationFileControllerFactory} to get the correct + * {@link LogAggregationFileController} for write and read. + * + */ +@Private +@Unstable +public class LogAggregationFileControllerFactory { + + private static final Log LOG = LogFactory.getLog( + LogAggregationFileControllerFactory.class); + private final Pattern p = Pattern.compile( + "^[A-Za-z_]+[A-Za-z0-9_]*$"); + private LinkedList controllers + = new LinkedList<>(); + private Configuration conf; + + /** + * Construct the LogAggregationFileControllerFactory object. + * @param conf the Configuration + */ + public LogAggregationFileControllerFactory(Configuration conf) { + this.conf = conf; + Collection fileControllers = conf.getStringCollection( + YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS); + List controllerClassName = new ArrayList<>(); + + Map controllerChecker = new HashMap<>(); + + for (String fileController : fileControllers) { + Preconditions.checkArgument(validateAggregatedFileControllerName( + fileController), "The FileControllerName: " + fileController + + " set in " + YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS + +" is invalid." + "The valid File Controller name should only " + + "contain a-zA-Z0-9_ and can not start with numbers"); + + String remoteDirStr = String.format( + YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, + fileController); + String remoteDir = conf.get(remoteDirStr); + boolean defaultRemoteDir = false; + if (remoteDir == null || remoteDir.isEmpty()) { + remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR); + defaultRemoteDir = true; + } + String suffixStr = String.format( + YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT, + fileController); + String suffix = conf.get(suffixStr); + boolean defaultSuffix = false; + if (suffix == null || suffix.isEmpty()) { + suffix = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); + defaultSuffix = true; + } + String dirSuffix = remoteDir + "-" + suffix; + if (controllerChecker.containsKey(dirSuffix)) { + if (defaultRemoteDir && defaultSuffix) { + String fileControllerStr = controllerChecker.get(dirSuffix); + List controllersList = new ArrayList<>(); + controllersList.add(fileControllerStr); + controllersList.add(fileController); + fileControllerStr = StringUtils.join(controllersList, ","); + controllerChecker.put(dirSuffix, fileControllerStr); + } else { + String conflictController = controllerChecker.get(dirSuffix); + throw new RuntimeException("The combined value of " + remoteDirStr + + " and " + suffixStr + " should not be the same as the value" + + " set for " + conflictController); + } + } else { + controllerChecker.put(dirSuffix, fileController); + } + String classKey = String.format( + YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT, + fileController); + String className = conf.get(classKey); + if (className == null || className.isEmpty()) { + throw new RuntimeException("No class configured for " + + fileController); + } + controllerClassName.add(className); + Class sClass = conf.getClass( + classKey, null, LogAggregationFileController.class); + if (sClass == null) { + throw new RuntimeException("No class defined for " + fileController); + } + LogAggregationFileController s = ReflectionUtils.newInstance( + sClass, conf); + if (s == null) { + throw new RuntimeException("No object created for " + + controllerClassName); + } + s.initialize(conf, fileController); + controllers.add(s); + } + } + + /** + * Get {@link LogAggregationFileController} to write. + * @return the LogAggregationFileController instance + */ + public LogAggregationFileController getFileControllerForWrite() { + return controllers.getFirst(); + } + + /** + * Get {@link LogAggregationFileController} to read the aggregated logs + * for this application. + * @param appId the ApplicationId + * @param appOwner the Application Owner + * @return the LogAggregationFileController instance + * @throws IOException if can not find any log aggregation file controller + */ + public LogAggregationFileController getFileControllerForRead( + ApplicationId appId, String appOwner) throws IOException { + StringBuilder diagnosis = new StringBuilder(); + for(LogAggregationFileController fileController : controllers) { + try { + Path remoteAppLogDir = fileController.getRemoteAppLogDir( + appId, appOwner); + Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified( + remoteAppLogDir); + RemoteIterator nodeFiles = FileContext.getFileContext( + qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir); + if (nodeFiles.hasNext()) { + return fileController; + } + } catch (Exception ex) { + diagnosis.append(ex.getMessage() + "\n"); + continue; + } + } + throw new IOException(diagnosis.toString()); + } + + private boolean validateAggregatedFileControllerName(String name) { + if (name == null || name.trim().isEmpty()) { + return false; + } + return p.matcher(name).matches(); + } + + @Private + @VisibleForTesting + public LinkedList + getConfiguredLogAggregationFileControllerList() { + return this.controllers; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java new file mode 100644 index 00000000000..9e0c66d73cc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation.filecontroller; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.util.Times; + +/** + * The TFile log aggregation file Controller implementation. + */ +@Private +@Unstable +public class LogAggregationTFileController + extends LogAggregationFileController { + + private static final Log LOG = LogFactory.getLog( + LogAggregationTFileController.class); + + private LogWriter writer; + + public LogAggregationTFileController(){} + + @Override + public void initInternal(Configuration conf) { + this.remoteRootLogDir = new Path( + conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); + this.remoteRootLogDirSuffix = + conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); + } + + @Override + public void initializeWriter(LogAggregationFileControllerContext context) + throws IOException { + this.writer = new LogWriter(); + writer.initialize(this.conf, context.getRemoteNodeTmpLogFileForApp(), + context.getUserUgi()); + // Write ACLs once when the writer is created. + writer.writeApplicationACLs(context.getAppAcls()); + writer.writeApplicationOwner(context.getUserUgi().getShortUserName()); + } + + @Override + public void closeWriter() { + this.writer.close(); + } + + @Override + public void write(LogKey logKey, LogValue logValue) throws IOException { + this.writer.append(logKey, logValue); + } + + @Override + public void postWrite(final LogAggregationFileControllerContext record) + throws Exception { + // Before upload logs, make sure the number of existing logs + // is smaller than the configured NM log aggregation retention size. + if (record.isUploadedLogsInThisCycle() && + record.isLogAggregationInRolling()) { + cleanOldLogs(record.getRemoteNodeLogFileForApp(), record.getNodeId(), + record.getUserUgi()); + record.increcleanupOldLogTimes(); + } + + final Path renamedPath = record.getRollingMonitorInterval() <= 0 + ? record.getRemoteNodeLogFileForApp() : new Path( + record.getRemoteNodeLogFileForApp().getParent(), + record.getRemoteNodeLogFileForApp().getName() + "_" + + record.getLogUploadTimeStamp()); + final boolean rename = record.isUploadedLogsInThisCycle(); + try { + record.getUserUgi().doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + FileSystem remoteFS = record.getRemoteNodeLogFileForApp() + .getFileSystem(conf); + if (rename) { + remoteFS.rename(record.getRemoteNodeTmpLogFileForApp(), + renamedPath); + } else { + remoteFS.delete(record.getRemoteNodeTmpLogFileForApp(), false); + } + return null; + } + }); + } catch (Exception e) { + LOG.error( + "Failed to move temporary log file to final location: [" + + record.getRemoteNodeTmpLogFileForApp() + "] to [" + + renamedPath + "]", e); + throw new Exception("Log uploaded failed for Application: " + + record.getAppId() + " in NodeManager: " + + LogAggregationUtils.getNodeString(record.getNodeId()) + " at " + + Times.format(record.getLogUploadTimeStamp()) + "\n"); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java new file mode 100644 index 00000000000..cad238a9a42 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Public +package org.apache.hadoop.yarn.logaggregation.filecontroller; +import org.apache.hadoop.classification.InterfaceAudience; + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index f93de4460ee..0823dfe7531 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1166,6 +1166,25 @@ -1 + + Specify which log file controllers we will support. The first + file controller we add will be used to write the aggregated logs. + This comma separated configuration will work with the configuration: + yarn.log-aggregation.file-controller.%s.class which defines the supported + file controller's class. By default, the TFile controller would be used. + The user could override this configuration by adding more file controllers. + To support back-ward compatibility, make sure that we always + add TFile file controller. + yarn.log-aggregation.file-formats + TFile + + + + Class that supports TFile read and write operations. + yarn.log-aggregation.file-controller.TFile.class + org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController + + How long for ResourceManager to wait for NodeManager to report its diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java index 1e71b3cdca8..3dd7de3a0ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java @@ -40,10 +40,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; import org.apache.hadoop.yarn.webapp.YarnWebParams; import org.apache.hadoop.yarn.webapp.log.AggregatedLogsBlockForTest; import org.apache.hadoop.yarn.webapp.view.BlockForTest; @@ -249,7 +253,7 @@ public class TestAggregatedLogsBlock { private Configuration getConfiguration() { - Configuration configuration = new Configuration(); + Configuration configuration = new YarnConfiguration(); configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); configuration.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, "target/logs"); configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); @@ -295,19 +299,25 @@ public class TestAggregatedLogsBlock { List rootLogDirs = Arrays.asList("target/logs/logs"); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - try (AggregatedLogFormat.LogWriter writer = - new AggregatedLogFormat.LogWriter()) { - writer.initialize(configuration, new Path(path), ugi); - writer.writeApplicationOwner(ugi.getUserName()); - + LogAggregationFileControllerFactory factory + = new LogAggregationFileControllerFactory(configuration); + LogAggregationFileController fileController = factory + .getFileControllerForWrite(); + try { Map appAcls = new HashMap<>(); appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); - writer.writeApplicationACLs(appAcls); - - writer.append( + NodeId nodeId = NodeId.newInstance("localhost", 1234); + LogAggregationFileControllerContext context + = new LogAggregationFileControllerContext( + new Path(path), new Path(path), false, 3600, + appId, appAcls, nodeId, ugi); + fileController.initializeWriter(context); + fileController.write( new AggregatedLogFormat.LogKey("container_0_0001_01_000001"), new AggregatedLogFormat.LogValue(rootLogDirs, containerId, UserGroupInformation.getCurrentUser().getShortUserName())); + } finally { + fileController.closeWriter(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java index 8b665e03f6e..a12e2a152b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java @@ -24,15 +24,21 @@ import java.io.FileWriter; import java.io.IOException; import java.io.Writer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; /** * This class contains several utility functions for log aggregation tests. @@ -110,14 +116,25 @@ public final class TestContainerLogsUtils { ContainerId containerId, Path appDir, FileSystem fs) throws IOException { Path path = new Path(appDir, LogAggregationUtils.getNodeString(nodeId)); - try (AggregatedLogFormat.LogWriter writer = - new AggregatedLogFormat.LogWriter()) { - writer.initialize(configuration, path, ugi); - writer.writeApplicationOwner(ugi.getUserName()); - - writer.append(new AggregatedLogFormat.LogKey(containerId), + LogAggregationFileControllerFactory factory + = new LogAggregationFileControllerFactory(configuration); + LogAggregationFileController fileController = factory + .getFileControllerForWrite(); + try { + Map appAcls = new HashMap<>(); + appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); + ApplicationId appId = containerId.getApplicationAttemptId() + .getApplicationId(); + LogAggregationFileControllerContext context + = new LogAggregationFileControllerContext( + path, path, true, 1000, + appId, appAcls, nodeId, ugi); + fileController.initializeWriter(context); + fileController.write(new AggregatedLogFormat.LogKey(containerId), new AggregatedLogFormat.LogValue(rootLogDirs, containerId, ugi.getShortUserName())); + } finally { + fileController.closeWriter(); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java new file mode 100644 index 00000000000..45f18c10481 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation.filecontroller; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.util.LinkedList; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController; +import org.junit.Test; + +/** + * Test LogAggregationFileControllerFactory. + * + */ +public class TestLogAggregationFileControllerFactory { + + @Test(timeout = 10000) + public void testLogAggregationFileControllerFactory() throws Exception { + ApplicationId appId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + String appOwner = "test"; + String remoteLogRootDir = "target/app-logs/"; + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, "log"); + FileSystem fs = FileSystem.get(conf); + + LogAggregationFileControllerFactory factory = + new LogAggregationFileControllerFactory(conf); + LinkedList list = factory + .getConfiguredLogAggregationFileControllerList(); + assertTrue(list.size() == 1); + assertTrue(list.getFirst() instanceof LogAggregationTFileController); + assertTrue(factory.getFileControllerForWrite() + instanceof LogAggregationTFileController); + Path logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner); + try { + if (fs.exists(logPath)) { + fs.delete(logPath, true); + } + assertTrue(fs.mkdirs(logPath)); + Writer writer = + new FileWriter(new File(logPath.toString(), "testLog")); + writer.write("test"); + writer.close(); + assertTrue(factory.getFileControllerForRead(appId, appOwner) + instanceof LogAggregationTFileController); + } finally { + fs.delete(logPath, true); + } + + conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, + "TestLogAggregationFileController"); + // Did not set class for TestLogAggregationFileController, + // should get the exception. + try { + factory = + new LogAggregationFileControllerFactory(conf); + fail(); + } catch (Exception ex) { + // should get exception + } + + conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, + "TestLogAggregationFileController,TFile"); + conf.setClass( + "yarn.log-aggregation.file-controller.TestLogAggregationFileController" + + ".class", TestLogAggregationFileController.class, + LogAggregationFileController.class); + + conf.set( + "yarn.log-aggregation.TestLogAggregationFileController" + + ".remote-app-log-dir", remoteLogRootDir); + conf.set( + "yarn.log-aggregation.TestLogAggregationFileController" + + ".remote-app-log-dir-suffix", "testLog"); + + factory = new LogAggregationFileControllerFactory(conf); + list = factory.getConfiguredLogAggregationFileControllerList(); + assertTrue(list.size() == 2); + assertTrue(list.getFirst() instanceof TestLogAggregationFileController); + assertTrue(list.getLast() instanceof LogAggregationTFileController); + assertTrue(factory.getFileControllerForWrite() + instanceof TestLogAggregationFileController); + + logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner); + try { + if (fs.exists(logPath)) { + fs.delete(logPath, true); + } + assertTrue(fs.mkdirs(logPath)); + Writer writer = + new FileWriter(new File(logPath.toString(), "testLog")); + writer.write("test"); + writer.close(); + assertTrue(factory.getFileControllerForRead(appId, appOwner) + instanceof TestLogAggregationFileController); + } finally { + fs.delete(logPath, true); + } + } + + private static class TestLogAggregationFileController + extends LogAggregationFileController { + + @Override + public void initInternal(Configuration conf) { + String remoteDirStr = String.format( + YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, + this.fileControllerName); + this.remoteRootLogDir = new Path(conf.get(remoteDirStr)); + String suffix = String.format( + YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT, + this.fileControllerName); + this.remoteRootLogDirSuffix = conf.get(suffix); + } + + @Override + public void closeWriter() { + // Do Nothing + } + + @Override + public void write(LogKey logKey, LogValue logValue) throws IOException { + // Do Nothing + } + + @Override + public void postWrite(LogAggregationFileControllerContext record) + throws Exception { + // Do Nothing + } + + @Override + public void initializeWriter(LogAggregationFileControllerContext context) + throws IOException { + // Do Nothing + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 1601c3fb0c6..51c63c4f01c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -19,11 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; import java.io.IOException; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -38,8 +34,6 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.security.Credentials; @@ -57,7 +51,9 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy; import org.apache.hadoop.yarn.server.api.ContainerLogContext; @@ -71,7 +67,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Times; @@ -86,18 +81,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator { private static final Logger LOG = LoggerFactory.getLogger(AppLogAggregatorImpl.class); private static final int THREAD_SLEEP_TIME = 1000; - // This is temporary solution. The configuration will be deleted once - // we find a more scalable method to only write a single log file per LRS. - private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP - = YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app"; - private static final int - DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30; - - // This configuration is for debug and test purpose. By setting - // this configuration as true. We can break the lower bound of - // NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS. - private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED - = YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled"; private final LocalDirsHandlerService dirsHandler; private final Dispatcher dispatcher; @@ -118,10 +101,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator { private final FileContext lfs; private final LogAggregationContext logAggregationContext; private final Context context; - private final int retentionSize; - private final long rollingMonitorInterval; - private final boolean logAggregationInRolling; private final NodeId nodeId; + private final LogAggregationFileControllerContext logControllerContext; // These variables are only for testing private final AtomicBoolean waiting = new AtomicBoolean(false); @@ -134,6 +115,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator { new HashMap(); private final ContainerLogAggregationPolicy logAggPolicy; + private final LogAggregationFileController logAggregationFileController; + /** * The value recovered from state store to determine the age of application @@ -151,7 +134,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { FileContext lfs, long rollingMonitorInterval) { this(dispatcher, deletionService, conf, appId, userUgi, nodeId, dirsHandler, remoteNodeLogFileForApp, appAcls, - logAggregationContext, context, lfs, rollingMonitorInterval, -1); + logAggregationContext, context, lfs, rollingMonitorInterval, -1, null); } public AppLogAggregatorImpl(Dispatcher dispatcher, @@ -162,6 +145,21 @@ public class AppLogAggregatorImpl implements AppLogAggregator { LogAggregationContext logAggregationContext, Context context, FileContext lfs, long rollingMonitorInterval, long recoveredLogInitedTime) { + this(dispatcher, deletionService, conf, appId, userUgi, nodeId, + dirsHandler, remoteNodeLogFileForApp, appAcls, + logAggregationContext, context, lfs, rollingMonitorInterval, + recoveredLogInitedTime, null); + } + + public AppLogAggregatorImpl(Dispatcher dispatcher, + DeletionService deletionService, Configuration conf, + ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId, + LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, + Map appAcls, + LogAggregationContext logAggregationContext, Context context, + FileContext lfs, long rollingMonitorInterval, + long recoveredLogInitedTime, + LogAggregationFileController logAggregationFileController) { this.dispatcher = dispatcher; this.conf = conf; this.delService = deletionService; @@ -169,31 +167,41 @@ public class AppLogAggregatorImpl implements AppLogAggregator { this.applicationId = appId.toString(); this.userUgi = userUgi; this.dirsHandler = dirsHandler; - this.remoteNodeLogFileForApp = remoteNodeLogFileForApp; - this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp(); this.pendingContainers = new LinkedBlockingQueue(); this.appAcls = appAcls; this.lfs = lfs; this.logAggregationContext = logAggregationContext; this.context = context; this.nodeId = nodeId; - int configuredRentionSize = - conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP, - DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP); - if (configuredRentionSize <= 0) { - this.retentionSize = - DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP; - } else { - this.retentionSize = configuredRentionSize; - } - this.rollingMonitorInterval = rollingMonitorInterval; - this.logAggregationInRolling = - this.rollingMonitorInterval <= 0 || this.logAggregationContext == null - || this.logAggregationContext.getRolledLogsIncludePattern() == null - || this.logAggregationContext.getRolledLogsIncludePattern() - .isEmpty() ? false : true; this.logAggPolicy = getLogAggPolicy(conf); this.recoveredLogInitedTime = recoveredLogInitedTime; + if (logAggregationFileController == null) { + // by default, use T-File Controller + this.logAggregationFileController = new LogAggregationTFileController(); + this.logAggregationFileController.initialize(conf, "TFile"); + this.logAggregationFileController.verifyAndCreateRemoteLogDir(); + this.logAggregationFileController.createAppDir( + this.userUgi.getShortUserName(), appId, userUgi); + this.remoteNodeLogFileForApp = this.logAggregationFileController + .getRemoteNodeLogFileForApp(appId, + this.userUgi.getShortUserName(), nodeId); + this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp(); + } else { + this.logAggregationFileController = logAggregationFileController; + this.remoteNodeLogFileForApp = remoteNodeLogFileForApp; + this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp(); + } + boolean logAggregationInRolling = + rollingMonitorInterval <= 0 || this.logAggregationContext == null + || this.logAggregationContext.getRolledLogsIncludePattern() == null + || this.logAggregationContext.getRolledLogsIncludePattern() + .isEmpty() ? false : true; + logControllerContext = new LogAggregationFileControllerContext( + this.remoteNodeLogFileForApp, + this.remoteNodeTmpLogFileForApp, + logAggregationInRolling, + rollingMonitorInterval, + this.appId, this.appAcls, this.nodeId, this.userUgi); } private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) { @@ -293,14 +301,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator { logAggregationTimes++; String diagnosticMessage = ""; boolean logAggregationSucceedInThisCycle = true; - try (LogWriter writer = createLogWriter()) { + try { try { - writer.initialize(this.conf, this.remoteNodeTmpLogFileForApp, - this.userUgi); - // Write ACLs once when the writer is created. - writer.writeApplicationACLs(appAcls); - writer.writeApplicationOwner(this.userUgi.getShortUserName()); - + logAggregationFileController.initializeWriter(logControllerContext); } catch (IOException e1) { logAggregationSucceedInThisCycle = false; LOG.error("Cannot create writer for app " + this.applicationId @@ -318,8 +321,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator { containerLogAggregators.put(container, aggregator); } Set uploadedFilePathsInThisCycle = - aggregator.doContainerLogAggregation(writer, appFinished, - finishedContainers.contains(container)); + aggregator.doContainerLogAggregation(logAggregationFileController, + appFinished, finishedContainers.contains(container)); if (uploadedFilePathsInThisCycle.size() > 0) { uploadedLogsInThisCycle = true; List uploadedFilePathsInThisCycleList = new ArrayList<>(); @@ -337,60 +340,28 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } } - // Before upload logs, make sure the number of existing logs - // is smaller than the configured NM log aggregation retention size. - if (uploadedLogsInThisCycle && logAggregationInRolling) { - cleanOldLogs(); - cleanupOldLogTimes++; - } - - long currentTime = System.currentTimeMillis(); - final Path renamedPath = getRenamedPath(currentTime); - - final boolean rename = uploadedLogsInThisCycle; + logControllerContext.setUploadedLogsInThisCycle(uploadedLogsInThisCycle); + logControllerContext.setLogUploadTimeStamp(System.currentTimeMillis()); + logControllerContext.increLogAggregationTimes(); try { - userUgi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf); - if (rename) { - remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath); - } else { - remoteFS.delete(remoteNodeTmpLogFileForApp, false); - } - return null; - } - }); - diagnosticMessage = - "Log uploaded successfully for Application: " + appId - + " in NodeManager: " - + LogAggregationUtils.getNodeString(nodeId) + " at " - + Times.format(currentTime) + "\n"; + this.logAggregationFileController.postWrite(logControllerContext); + diagnosticMessage = "Log uploaded successfully for Application: " + + appId + " in NodeManager: " + + LogAggregationUtils.getNodeString(nodeId) + " at " + + Times.format(logControllerContext.getLogUploadTimeStamp()) + + "\n"; } catch (Exception e) { - LOG.error( - "Failed to move temporary log file to final location: [" - + remoteNodeTmpLogFileForApp + "] to [" - + renamedPath + "]", e); - diagnosticMessage = - "Log uploaded failed for Application: " + appId - + " in NodeManager: " - + LogAggregationUtils.getNodeString(nodeId) + " at " - + Times.format(currentTime) + "\n"; + diagnosticMessage = e.getMessage(); renameTemporaryLogFileFailed = true; logAggregationSucceedInThisCycle = false; } } finally { sendLogAggregationReport(logAggregationSucceedInThisCycle, diagnosticMessage, appFinished); + logAggregationFileController.closeWriter(); } } - private Path getRenamedPath(long currentTime) { - return this.rollingMonitorInterval <= 0 ? remoteNodeLogFileForApp - : new Path(remoteNodeLogFileForApp.getParent(), - remoteNodeLogFileForApp.getName() + "_" + currentTime); - } - private void addCredentials() { if (UserGroupInformation.isSecurityEnabled()) { Credentials systemCredentials = @@ -407,11 +378,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } } - @VisibleForTesting - protected LogWriter createLogWriter() { - return new LogWriter(); - } - private void sendLogAggregationReport( boolean logAggregationSucceedInThisCycle, String diagnosticMessage, boolean appFinished) { @@ -442,60 +408,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator { this.context.getLogAggregationStatusForApps().add(report); } - private void cleanOldLogs() { - try { - final FileSystem remoteFS = - this.remoteNodeLogFileForApp.getFileSystem(conf); - Path appDir = - this.remoteNodeLogFileForApp.getParent().makeQualified( - remoteFS.getUri(), remoteFS.getWorkingDirectory()); - Set status = - new HashSet(Arrays.asList(remoteFS.listStatus(appDir))); - - Iterable mask = - Iterables.filter(status, new Predicate() { - @Override - public boolean apply(FileStatus next) { - return next.getPath().getName() - .contains(LogAggregationUtils.getNodeString(nodeId)) - && !next.getPath().getName().endsWith( - LogAggregationUtils.TMP_FILE_SUFFIX); - } - }); - status = Sets.newHashSet(mask); - // Normally, we just need to delete one oldest log - // before we upload a new log. - // If we can not delete the older logs in this cycle, - // we will delete them in next cycle. - if (status.size() >= this.retentionSize) { - // sort by the lastModificationTime ascending - List statusList = new ArrayList(status); - Collections.sort(statusList, new Comparator() { - public int compare(FileStatus s1, FileStatus s2) { - return s1.getModificationTime() < s2.getModificationTime() ? -1 - : s1.getModificationTime() > s2.getModificationTime() ? 1 : 0; - } - }); - for (int i = 0 ; i <= statusList.size() - this.retentionSize; i++) { - final FileStatus remove = statusList.get(i); - try { - userUgi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - remoteFS.delete(remove.getPath(), false); - return null; - } - }); - } catch (Exception e) { - LOG.error("Failed to delete " + remove.getPath(), e); - } - } - } - } catch (Exception e) { - LOG.error("Failed to clean old logs", e); - } - } - @SuppressWarnings("unchecked") @Override public void run() { @@ -523,8 +435,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator { synchronized(this) { try { waiting.set(true); - if (logAggregationInRolling) { - wait(this.rollingMonitorInterval * 1000); + if (logControllerContext.isLogAggregationInRolling()) { + wait(logControllerContext.getRollingMonitorInterval() * 1000); if (this.appFinishing.get() || this.aborted.get()) { break; } @@ -653,7 +565,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator { recoveredLogInitedTime, logRetentionSecs * 1000); } - public Set doContainerLogAggregation(LogWriter writer, + public Set doContainerLogAggregation( + LogAggregationFileController logAggregationFileController, boolean appFinished, boolean containerFinished) { LOG.info("Uploading logs for container " + containerId + ". Current good log dirs are " @@ -665,7 +578,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { this.uploadedFileMeta, retentionContext, appFinished, containerFinished); try { - writer.append(logKey, logValue); + logAggregationFileController.write(logKey, logValue); } catch (Exception e) { LOG.error("Couldn't upload logs for " + containerId + ". Skipping this container.", e); @@ -708,4 +621,15 @@ public class AppLogAggregatorImpl implements AppLogAggregator { int getCleanupOldLogTimes() { return this.cleanupOldLogTimes; } + + @VisibleForTesting + public LogAggregationFileController getLogAggregationFileController() { + return this.logAggregationFileController; + } + + @VisibleForTesting + public LogAggregationFileControllerContext + getLogAggregationFileControllerContext() { + return this.logControllerContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index aafd7d8e03b..1a59e4589aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -18,9 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; -import java.io.FileNotFoundException; import java.io.IOException; -import java.security.PrivilegedExceptionAction; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -32,10 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; @@ -48,7 +43,8 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; import org.apache.hadoop.yarn.server.api.ContainerLogContext; import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -79,36 +75,14 @@ public class LogAggregationService extends AbstractService implements = YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled"; private long rollingMonitorInterval; - /* - * Expected deployment TLD will be 1777, owner=, group= App dirs will be created as 770, - * owner=, group=: so that the owner and can - * access / modify the files. - * should obviously be a limited access group. - */ - /** - * Permissions for the top level directory under which app directories will be - * created. - */ - private static final FsPermission TLDIR_PERMISSIONS = FsPermission - .createImmutable((short) 01777); - /** - * Permissions for the Application directory. - */ - private static final FsPermission APP_DIR_PERMISSIONS = FsPermission - .createImmutable((short) 0770); - private final Context context; private final DeletionService deletionService; private final Dispatcher dispatcher; private LocalDirsHandlerService dirsHandler; - Path remoteRootLogDir; - String remoteRootLogDirSuffix; private NodeId nodeId; private final ConcurrentMap appLogAggregators; - private boolean logPermError = true; @VisibleForTesting ExecutorService threadPool; @@ -125,12 +99,6 @@ public class LogAggregationService extends AbstractService implements } protected void serviceInit(Configuration conf) throws Exception { - this.remoteRootLogDir = - new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); - this.remoteRootLogDirSuffix = - conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); int threadPoolSize = getAggregatorThreadPoolSize(conf); this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize, new ThreadFactoryBuilder() @@ -218,158 +186,6 @@ public class LogAggregationService extends AbstractService implements } } - protected FileSystem getFileSystem(Configuration conf) throws IOException { - return this.remoteRootLogDir.getFileSystem(conf); - } - - void verifyAndCreateRemoteLogDir(Configuration conf) { - // Checking the existence of the TLD - FileSystem remoteFS = null; - try { - remoteFS = getFileSystem(conf); - } catch (IOException e) { - throw new YarnRuntimeException("Unable to get Remote FileSystem instance", e); - } - boolean remoteExists = true; - try { - FsPermission perms = - remoteFS.getFileStatus(this.remoteRootLogDir).getPermission(); - if (!perms.equals(TLDIR_PERMISSIONS) && logPermError) { - LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir - + "] already exist, but with incorrect permissions. " - + "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms - + "]." + " The cluster may have problems with multiple users."); - logPermError = false; - } else { - logPermError = true; - } - } catch (FileNotFoundException e) { - remoteExists = false; - } catch (IOException e) { - throw new YarnRuntimeException( - "Failed to check permissions for dir [" - + this.remoteRootLogDir + "]", e); - } - if (!remoteExists) { - LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir - + "] does not exist. Attempting to create it."); - try { - Path qualified = - this.remoteRootLogDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS)); - remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS)); - - UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); - String primaryGroupName = null; - try { - primaryGroupName = loginUser.getPrimaryGroupName(); - } catch (IOException e) { - LOG.warn("No primary group found. The remote root log directory" + - " will be created with the HDFS superuser being its group " + - "owner. JobHistoryServer may be unable to read the directory."); - } - // set owner on the remote directory only if the primary group exists - if (primaryGroupName != null) { - remoteFS.setOwner(qualified, - loginUser.getShortUserName(), primaryGroupName); - } - } catch (IOException e) { - throw new YarnRuntimeException("Failed to create remoteLogDir [" - + this.remoteRootLogDir + "]", e); - } - } - } - - Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) { - return LogAggregationUtils.getRemoteNodeLogFileForApp( - this.remoteRootLogDir, appId, user, this.nodeId, - this.remoteRootLogDirSuffix); - } - - Path getRemoteAppLogDir(ApplicationId appId, String user) { - return LogAggregationUtils.getRemoteAppLogDir(this.remoteRootLogDir, appId, - user, this.remoteRootLogDirSuffix); - } - - private void createDir(FileSystem fs, Path path, FsPermission fsPerm) - throws IOException { - FsPermission dirPerm = new FsPermission(fsPerm); - fs.mkdirs(path, dirPerm); - FsPermission umask = FsPermission.getUMask(fs.getConf()); - if (!dirPerm.equals(dirPerm.applyUMask(umask))) { - fs.setPermission(path, new FsPermission(fsPerm)); - } - } - - private boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm) - throws IOException { - boolean exists = true; - try { - FileStatus appDirStatus = fs.getFileStatus(path); - if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) { - fs.setPermission(path, APP_DIR_PERMISSIONS); - } - } catch (FileNotFoundException fnfe) { - exists = false; - } - return exists; - } - - protected void createAppDir(final String user, final ApplicationId appId, - UserGroupInformation userUgi) { - try { - userUgi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - try { - // TODO: Reuse FS for user? - FileSystem remoteFS = getFileSystem(getConfig()); - - // Only creating directories if they are missing to avoid - // unnecessary load on the filesystem from all of the nodes - Path appDir = LogAggregationUtils.getRemoteAppLogDir( - LogAggregationService.this.remoteRootLogDir, appId, user, - LogAggregationService.this.remoteRootLogDirSuffix); - appDir = appDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) { - Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( - LogAggregationService.this.remoteRootLogDir, user, - LogAggregationService.this.remoteRootLogDirSuffix); - suffixDir = suffixDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { - Path userDir = LogAggregationUtils.getRemoteLogUserDir( - LogAggregationService.this.remoteRootLogDir, user); - userDir = userDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { - createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); - } - - createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); - } - - createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); - } - - } catch (IOException e) { - LOG.error("Failed to setup application log directory for " - + appId, e); - throw e; - } - return null; - } - }); - } catch (Exception e) { - throw new YarnRuntimeException(e); - } - } - @SuppressWarnings("unchecked") private void initApp(final ApplicationId appId, String user, Credentials credentials, Map appAcls, @@ -377,7 +193,6 @@ public class LogAggregationService extends AbstractService implements long recoveredLogInitedTime) { ApplicationEvent eventResponse; try { - verifyAndCreateRemoteLogDir(getConfig()); initAppAggregator(appId, user, credentials, appAcls, logAggregationContext, recoveredLogInitedTime); eventResponse = new ApplicationEvent(appId, @@ -410,14 +225,17 @@ public class LogAggregationService extends AbstractService implements userUgi.addCredentials(credentials); } + LogAggregationFileController logAggregationFileController + = getLogAggregationFileController(getConfig()); + logAggregationFileController.verifyAndCreateRemoteLogDir(); // New application final AppLogAggregator appLogAggregator = new AppLogAggregatorImpl(this.dispatcher, this.deletionService, getConfig(), appId, userUgi, this.nodeId, dirsHandler, - getRemoteNodeLogFileForApp(appId, user), - appAcls, logAggregationContext, this.context, + logAggregationFileController.getRemoteNodeLogFileForApp(appId, + user, nodeId), appAcls, logAggregationContext, this.context, getLocalFileContext(getConfig()), this.rollingMonitorInterval, - recoveredLogInitedTime); + recoveredLogInitedTime, logAggregationFileController); if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { throw new YarnRuntimeException("Duplicate initApp for " + appId); } @@ -425,7 +243,7 @@ public class LogAggregationService extends AbstractService implements YarnRuntimeException appDirException = null; try { // Create the app dir - createAppDir(user, appId, userUgi); + logAggregationFileController.createAppDir(user, appId, userUgi); } catch (Exception e) { appLogAggregator.disableLogAggregation(); if (!(e instanceof YarnRuntimeException)) { @@ -570,4 +388,14 @@ public class LogAggregationService extends AbstractService implements } return threadPoolSize; } + + @VisibleForTesting + public LogAggregationFileController getLogAggregationFileController( + Configuration conf) { + LogAggregationFileControllerFactory factory + = new LogAggregationFileControllerFactory(conf); + LogAggregationFileController logAggregationFileController = factory + .getFileControllerForWrite(); + return logAggregationFileController; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java index b4bd9d7e884..bedad33ceab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java @@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController; import org.apache.hadoop.yarn.server.api.ContainerLogContext; import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -241,8 +241,8 @@ public class TestAppLogAggregatorImpl { // verify uploaded files ArgumentCaptor logValCaptor = ArgumentCaptor.forClass(LogValue.class); - verify(appLogAggregator.logWriter).append(any(LogKey.class), - logValCaptor.capture()); + verify(appLogAggregator.getLogAggregationFileController()).write( + any(LogKey.class), logValCaptor.capture()); Set filesUploaded = new HashSet<>(); LogValue logValue = logValCaptor.getValue(); for(File file: logValue.getPendingLogFilesToUploadForThisContainer()) { @@ -287,11 +287,13 @@ public class TestAppLogAggregatorImpl { final Context context = createContext(config); final FileContext fakeLfs = mock(FileContext.class); final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath()); - + LogAggregationTFileController format = spy( + new LogAggregationTFileController()); + format.initialize(config, "TFile"); return new AppLogAggregatorInTest(dispatcher, deletionService, config, applicationId, ugi, nodeId, dirsService, remoteLogDirForApp, appAcls, logAggregationContext, - context, fakeLfs, recoveredLogInitedTimeMillis); + context, fakeLfs, recoveredLogInitedTimeMillis, format); } /** @@ -402,7 +404,6 @@ public class TestAppLogAggregatorImpl { final DeletionService deletionService; final ApplicationId applicationId; - final LogWriter logWriter; final ArgumentCaptor logValue; public AppLogAggregatorInTest(Dispatcher dispatcher, @@ -411,19 +412,15 @@ public class TestAppLogAggregatorImpl { LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, Map appAcls, LogAggregationContext logAggregationContext, Context context, - FileContext lfs, long recoveredLogInitedTime) throws IOException { + FileContext lfs, long recoveredLogInitedTime, + LogAggregationTFileController format) throws IOException { super(dispatcher, deletionService, conf, appId, ugi, nodeId, dirsHandler, remoteNodeLogFileForApp, appAcls, - logAggregationContext, context, lfs, -1, recoveredLogInitedTime); + logAggregationContext, context, lfs, -1, recoveredLogInitedTime, + format); this.applicationId = appId; this.deletionService = deletionService; - this.logWriter = spy(new LogWriter()); this.logValue = ArgumentCaptor.forClass(LogValue.class); } - - @Override - protected LogWriter createLogWriter() { - return this.logWriter; - } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 6383e83195d..0fa012c53ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -103,6 +103,9 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; @@ -161,11 +164,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest { DrainDispatcher dispatcher; EventHandler appEventHandler; + private NodeId nodeId = NodeId.newInstance("0.0.0.0", 5555); + @Override @SuppressWarnings("unchecked") public void setup() throws IOException { super.setup(); - NodeId nodeId = NodeId.newInstance("0.0.0.0", 5555); ((NMContext)context).setNodeId(nodeId); dispatcher = createDispatcher(); appEventHandler = mock(EventHandler.class); @@ -246,9 +250,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest { Assert.assertFalse("Directory [" + app1LogDir + "] was not deleted", app1LogDir.exists()); - Path logFilePath = - logAggregationService.getRemoteNodeLogFileForApp(application1, - this.user); + Path logFilePath = logAggregationService + .getLogAggregationFileController(conf) + .getRemoteNodeLogFileForApp(application1, this.user, nodeId); Assert.assertTrue("Log file [" + logFilePath + "] not found", new File( logFilePath.toUri().getPath()).exists()); @@ -369,9 +373,10 @@ public class TestLogAggregationService extends BaseContainerManagerTest { logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); - - Assert.assertFalse(new File(logAggregationService - .getRemoteNodeLogFileForApp(application1, this.user).toUri().getPath()) + LogAggregationFileController format1 = + logAggregationService.getLogAggregationFileController(conf); + Assert.assertFalse(new File(format1.getRemoteNodeLogFileForApp( + application1, this.user, this.nodeId).toUri().getPath()) .exists()); dispatcher.await(); @@ -541,26 +546,33 @@ public class TestLogAggregationService extends BaseContainerManagerTest { }; checkEvents(appEventHandler, expectedFinishedEvents, false, "getType", "getApplicationID"); } - + @Test public void testVerifyAndCreateRemoteDirsFailure() throws Exception { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - + LogAggregationFileControllerFactory factory + = new LogAggregationFileControllerFactory(conf); + LogAggregationFileController logAggregationFileFormat = factory + .getFileControllerForWrite(); + LogAggregationFileController spyLogAggregationFileFormat = + spy(logAggregationFileFormat); + YarnRuntimeException e = new YarnRuntimeException("KABOOM!"); + doThrow(e).doNothing().when(spyLogAggregationFileFormat) + .verifyAndCreateRemoteLogDir(); LogAggregationService logAggregationService = spy( new LogAggregationService(dispatcher, this.context, this.delSrvc, - super.dirsHandler)); + super.dirsHandler) { + @Override + public LogAggregationFileController getLogAggregationFileController( + Configuration conf) { + return spyLogAggregationFileFormat; + } + }); logAggregationService.init(this.conf); - - YarnRuntimeException e = new YarnRuntimeException("KABOOM!"); - doThrow(e) - .when(logAggregationService).verifyAndCreateRemoteLogDir( - any(Configuration.class)); - logAggregationService.start(); - // Now try to start an application ApplicationId appId = BuilderUtils.newApplicationId(System.currentTimeMillis(), @@ -607,8 +619,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { logAggregationService.stop(); } - - + @Test public void testVerifyAndCreateRemoteDirNonExistence() throws Exception { @@ -621,14 +632,24 @@ public class TestLogAggregationService extends BaseContainerManagerTest { new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler)); logAggregationService.init(this.conf); + logAggregationService.start(); boolean existsBefore = aNewFile.exists(); assertTrue("The new file already exists!", !existsBefore); - logAggregationService.verifyAndCreateRemoteLogDir(this.conf); - + ApplicationId appId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + LogAggregationContext contextWithAMAndFailed = + Records.newRecord(LogAggregationContext.class); + contextWithAMAndFailed.setLogAggregationPolicyClassName( + AMOrFailedContainerLogAggregationPolicy.class.getName()); + logAggregationService.handle(new LogHandlerAppStartedEvent(appId, + this.user, null, this.acls, contextWithAMAndFailed)); + dispatcher.await(); + boolean existsAfter = aNewFile.exists(); assertTrue("The new aggregate file is not successfully created", existsAfter); aNewFile.delete(); //housekeeping + logAggregationService.stop(); } @Test @@ -641,7 +662,17 @@ public class TestLogAggregationService extends BaseContainerManagerTest { LogAggregationService logAggregationService = new LogAggregationService( dispatcher, this.context, this.delSrvc, super.dirsHandler); logAggregationService.init(this.conf); - logAggregationService.verifyAndCreateRemoteLogDir(this.conf); + logAggregationService.start(); + + ApplicationId appId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + LogAggregationContext contextWithAMAndFailed = + Records.newRecord(LogAggregationContext.class); + contextWithAMAndFailed.setLogAggregationPolicyClassName( + AMOrFailedContainerLogAggregationPolicy.class.getName()); + logAggregationService.handle(new LogHandlerAppStartedEvent(appId, + this.user, null, this.acls, contextWithAMAndFailed)); + dispatcher.await(); String targetGroup = UserGroupInformation.getLoginUser().getPrimaryGroupName(); @@ -651,6 +682,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { fileStatus.getGroup(), targetGroup); fs.delete(aNewFile, true); + logAggregationService.stop(); } @Test @@ -669,14 +701,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest { FileSystem fs = FileSystem.get(this.conf); final FileSystem spyFs = spy(FileSystem.get(this.conf)); + final LogAggregationTFileController spyFileFormat + = new LogAggregationTFileController() { + @Override + public FileSystem getFileSystem(Configuration conf) + throws IOException { + return spyFs; + } + }; + spyFileFormat.initialize(conf, "TFile"); LogAggregationService aggSvc = new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler) { @Override - protected FileSystem getFileSystem(Configuration conf) { - return spyFs; + public LogAggregationFileController getLogAggregationFileController( + Configuration conf) { + return spyFileFormat; } }; - aggSvc.init(this.conf); aggSvc.start(); @@ -769,18 +810,36 @@ public class TestLogAggregationService extends BaseContainerManagerTest { @Test public void testLogAggregationCreateDirsFailsWithoutKillingNM() throws Exception { - - this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); + + this.conf.set(YarnConfiguration.NM_LOG_DIRS, + localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); DeletionService spyDelSrvc = spy(this.delSrvc); + LogAggregationFileControllerFactory factory + = new LogAggregationFileControllerFactory(conf); + LogAggregationFileController logAggregationFileFormat = factory + .getFileControllerForWrite(); + LogAggregationFileController spyLogAggregationFileFormat = + spy(logAggregationFileFormat); + Exception e = new RuntimeException("KABOOM!"); + doThrow(e).when(spyLogAggregationFileFormat) + .createAppDir(any(String.class), any(ApplicationId.class), + any(UserGroupInformation.class)); LogAggregationService logAggregationService = spy( new LogAggregationService(dispatcher, this.context, spyDelSrvc, - super.dirsHandler)); + super.dirsHandler){ + @Override + public LogAggregationFileController getLogAggregationFileController( + Configuration conf) { + return spyLogAggregationFileFormat; + } + }); + logAggregationService.init(this.conf); logAggregationService.start(); - + ApplicationId appId = BuilderUtils.newApplicationId(System.currentTimeMillis(), (int) (Math.random() * 1000)); @@ -789,10 +848,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest { new File(localLogDir, appId.toString()); appLogDir.mkdir(); - Exception e = new RuntimeException("KABOOM!"); - doThrow(e) - .when(logAggregationService).createAppDir(any(String.class), - any(ApplicationId.class), any(UserGroupInformation.class)); LogAggregationContext contextWithAMAndFailed = Records.newRecord(LogAggregationContext.class); contextWithAMAndFailed.setLogAggregationPolicyClassName( @@ -867,7 +922,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest { int minNumOfContainers, int maxNumOfContainers, String[] logFiles, int numOfLogsPerContainer, boolean multiLogs) throws IOException { - Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user); + Path appLogDir = logAggregationService.getLogAggregationFileController( + conf).getRemoteAppLogDir(appId, this.user); RemoteIterator nodeFiles = null; try { Path qualifiedLogDir = @@ -2108,7 +2164,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user, null, this.acls, logAggContext)); - + dispatcher.await(); return logAggregationService; } @@ -2462,17 +2518,20 @@ public class TestLogAggregationService extends BaseContainerManagerTest { logAggregationService.stop(); assertEquals(expectedLogAggregationTimes, - aggregator.getLogAggregationTimes()); + aggregator.getLogAggregationFileControllerContext() + .getLogAggregationTimes()); assertEquals(expectedAggregationReportNum, this.context.getLogAggregationStatusForApps().size()); assertEquals(expectedCleanupOldLogsTimes, - aggregator.getCleanupOldLogTimes()); + aggregator.getLogAggregationFileControllerContext() + .getCleanOldLogsTimes()); } private int numOfLogsAvailable(LogAggregationService logAggregationService, ApplicationId appId, boolean sizeLimited, String lastLogFile) throws IOException { - Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user); + Path appLogDir = logAggregationService.getLogAggregationFileController( + conf).getRemoteAppLogDir(appId, this.user); RemoteIterator nodeFiles = null; try { Path qualifiedLogDir =