MAPREDUCE-6480. archive-logs tool may miss applications (rkanter)

(cherry picked from commit 8fec882cfaac047af1892615fe32d8c3ca02e7f9)
This commit is contained in:
Robert Kanter 2015-09-25 14:59:49 -07:00
parent aca014ea67
commit d8a5d2b2fc
5 changed files with 392 additions and 134 deletions

View File

@ -311,6 +311,8 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6484. Yarn Client uses local address instead of RM address as MAPREDUCE-6484. Yarn Client uses local address instead of RM address as
token renewer in a secure cluster when RM HA is enabled. (Zhihai Xu) token renewer in a secure cluster when RM HA is enabled. (Zhihai Xu)
MAPREDUCE-6480. archive-logs tool may miss applications (rkanter)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<FindBugsFilter>
<!--
Ignore warnings for usage of System.exit. These are appropriate.
-->
<Match>
<Class name="org.apache.hadoop.tools.HadoopArchiveLogs" />
<Method name="handleOpts" />
<Bug pattern="DM_EXIT" />
</Match>
<Match>
<Class name="org.apache.hadoop.tools.HadoopArchiveLogs" />
<Method name="run" />
<Bug pattern="DM_EXIT" />
</Match>
</FindBugsFilter>

View File

@ -118,6 +118,12 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId> <artifactId>hadoop-common</artifactId>
@ -166,6 +172,18 @@
</archive> </archive>
</configuration> </configuration>
</plugin> </plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<configuration>
<findbugsXmlOutput>true</findbugsXmlOutput>
<xmlOutput>true</xmlOutput>
<excludeFilterFile>
${basedir}/dev-support/findbugs-exclude.xml
</excludeFilterFile>
<effort>Max</effort>
</configuration>
</plugin>
</plugins> </plugins>
</build> </build>
</project> </project>

View File

@ -26,12 +26,14 @@
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException; import org.apache.commons.cli.ParseException;
import org.apache.commons.io.output.FileWriterWithEncoding;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
@ -43,13 +45,15 @@
import org.apache.hadoop.yarn.applications.distributedshell.Client; import org.apache.hadoop.yarn.applications.distributedshell.Client;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashSet; import java.util.HashSet;
@ -71,6 +75,7 @@ public class HadoopArchiveLogs implements Tool {
private static final String MIN_NUM_LOG_FILES_OPTION = "minNumberLogFiles"; private static final String MIN_NUM_LOG_FILES_OPTION = "minNumberLogFiles";
private static final String MAX_TOTAL_LOGS_SIZE_OPTION = "maxTotalLogsSize"; private static final String MAX_TOTAL_LOGS_SIZE_OPTION = "maxTotalLogsSize";
private static final String MEMORY_OPTION = "memory"; private static final String MEMORY_OPTION = "memory";
private static final String VERBOSE_OPTION = "verbose";
private static final int DEFAULT_MAX_ELIGIBLE = -1; private static final int DEFAULT_MAX_ELIGIBLE = -1;
private static final int DEFAULT_MIN_NUM_LOG_FILES = 20; private static final int DEFAULT_MIN_NUM_LOG_FILES = 20;
@ -85,9 +90,10 @@ public class HadoopArchiveLogs implements Tool {
long maxTotalLogsSize = DEFAULT_MAX_TOTAL_LOGS_SIZE * 1024L * 1024L; long maxTotalLogsSize = DEFAULT_MAX_TOTAL_LOGS_SIZE * 1024L * 1024L;
@VisibleForTesting @VisibleForTesting
long memory = DEFAULT_MEMORY; long memory = DEFAULT_MEMORY;
private boolean verbose = false;
@VisibleForTesting @VisibleForTesting
Set<ApplicationReport> eligibleApplications; Set<AppInfo> eligibleApplications;
private JobConf conf; private JobConf conf;
@ -122,17 +128,20 @@ public static void main(String[] args) {
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
handleOpts(args); handleOpts(args);
findAggregatedApps();
FileSystem fs = null; FileSystem fs = null;
Path remoteRootLogDir = new Path(conf.get( Path remoteRootLogDir = new Path(conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
Path workingDir = new Path(remoteRootLogDir, "archive-logs-work"); Path workingDir = new Path(remoteRootLogDir, "archive-logs-work");
if (verbose) {
LOG.info("Remote Log Dir Root: " + remoteRootLogDir);
LOG.info("Log Suffix: " + suffix);
LOG.info("Working Dir: " + workingDir);
}
try { try {
fs = FileSystem.get(conf); fs = FileSystem.get(conf);
checkFiles(fs, remoteRootLogDir, suffix); checkFilesAndSeedApps(fs, remoteRootLogDir, suffix);
// Prepare working directory // Prepare working directory
if (fs.exists(workingDir)) { if (fs.exists(workingDir)) {
@ -147,6 +156,8 @@ public int run(String[] args) throws Exception {
} }
} }
filterAppsByAggregatedStatus();
checkMaxEligible(); checkMaxEligible();
if (eligibleApplications.isEmpty()) { if (eligibleApplications.isEmpty()) {
@ -156,8 +167,8 @@ public int run(String[] args) throws Exception {
StringBuilder sb = StringBuilder sb =
new StringBuilder("Will process the following applications:"); new StringBuilder("Will process the following applications:");
for (ApplicationReport report : eligibleApplications) { for (AppInfo app : eligibleApplications) {
sb.append("\n\t").append(report.getApplicationId()); sb.append("\n\t").append(app.getAppId());
} }
LOG.info(sb.toString()); LOG.info(sb.toString());
@ -189,11 +200,14 @@ private void handleOpts(String[] args) throws ParseException {
"The amount of memory (in megabytes) for each container (default: " "The amount of memory (in megabytes) for each container (default: "
+ DEFAULT_MEMORY + ")"); + DEFAULT_MEMORY + ")");
memoryOpt.setArgName("megabytes"); memoryOpt.setArgName("megabytes");
Option verboseOpt = new Option(VERBOSE_OPTION, false,
"Print more details.");
opts.addOption(helpOpt); opts.addOption(helpOpt);
opts.addOption(maxEligibleOpt); opts.addOption(maxEligibleOpt);
opts.addOption(minNumLogFilesOpt); opts.addOption(minNumLogFilesOpt);
opts.addOption(maxTotalLogsSizeOpt); opts.addOption(maxTotalLogsSizeOpt);
opts.addOption(memoryOpt); opts.addOption(memoryOpt);
opts.addOption(verboseOpt);
try { try {
CommandLineParser parser = new GnuParser(); CommandLineParser parser = new GnuParser();
@ -225,6 +239,9 @@ private void handleOpts(String[] args) throws ParseException {
if (commandLine.hasOption(MEMORY_OPTION)) { if (commandLine.hasOption(MEMORY_OPTION)) {
memory = Long.parseLong(commandLine.getOptionValue(MEMORY_OPTION)); memory = Long.parseLong(commandLine.getOptionValue(MEMORY_OPTION));
} }
if (commandLine.hasOption(VERBOSE_OPTION)) {
verbose = true;
}
} catch (ParseException pe) { } catch (ParseException pe) {
HelpFormatter formatter = new HelpFormatter(); HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("yarn archive-logs", opts); formatter.printHelp("yarn archive-logs", opts);
@ -233,17 +250,39 @@ private void handleOpts(String[] args) throws ParseException {
} }
@VisibleForTesting @VisibleForTesting
void findAggregatedApps() throws IOException, YarnException { void filterAppsByAggregatedStatus() throws IOException, YarnException {
YarnClient client = YarnClient.createYarnClient(); YarnClient client = YarnClient.createYarnClient();
try { try {
client.init(getConf()); client.init(getConf());
client.start(); client.start();
List<ApplicationReport> reports = client.getApplications(); for (Iterator<AppInfo> it = eligibleApplications.iterator();
for (ApplicationReport report : reports) { it.hasNext();) {
AppInfo app = it.next();
try {
ApplicationReport report = client.getApplicationReport(
ConverterUtils.toApplicationId(app.getAppId()));
LogAggregationStatus aggStatus = report.getLogAggregationStatus(); LogAggregationStatus aggStatus = report.getLogAggregationStatus();
if (aggStatus.equals(LogAggregationStatus.SUCCEEDED) || if (aggStatus.equals(LogAggregationStatus.RUNNING) ||
aggStatus.equals(LogAggregationStatus.RUNNING_WITH_FAILURE) ||
aggStatus.equals(LogAggregationStatus.NOT_START) ||
aggStatus.equals(LogAggregationStatus.DISABLED) ||
aggStatus.equals(LogAggregationStatus.FAILED)) { aggStatus.equals(LogAggregationStatus.FAILED)) {
eligibleApplications.add(report); if (verbose) {
LOG.info("Skipping " + app.getAppId() +
" due to aggregation status being " + aggStatus);
}
it.remove();
} else {
if (verbose) {
LOG.info(app.getAppId() + " has aggregation status " + aggStatus);
}
app.setFinishTime(report.getFinishTime());
}
} catch (ApplicationNotFoundException e) {
// Assume the aggregation has finished
if (verbose) {
LOG.info(app.getAppId() + " not in the ResourceManager");
}
} }
} }
} finally { } finally {
@ -254,33 +293,71 @@ void findAggregatedApps() throws IOException, YarnException {
} }
@VisibleForTesting @VisibleForTesting
void checkFiles(FileSystem fs, Path remoteRootLogDir, String suffix) { void checkFilesAndSeedApps(FileSystem fs, Path remoteRootLogDir,
for (Iterator<ApplicationReport> reportIt = eligibleApplications.iterator(); String suffix) throws IOException {
reportIt.hasNext(); ) { for (RemoteIterator<FileStatus> userIt =
ApplicationReport report = reportIt.next(); fs.listStatusIterator(remoteRootLogDir); userIt.hasNext();) {
long totalFileSize = 0L; Path userLogPath = userIt.next().getPath();
try { try {
FileStatus[] files = fs.listStatus( for (RemoteIterator<FileStatus> appIt =
LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, fs.listStatusIterator(new Path(userLogPath, suffix));
report.getApplicationId(), report.getUser(), suffix)); appIt.hasNext();) {
if (files.length < minNumLogFiles) { Path appLogPath = appIt.next().getPath();
reportIt.remove(); try {
} else { FileStatus[] files = fs.listStatus(appLogPath);
if (files.length >= minNumLogFiles) {
boolean eligible = true;
long totalFileSize = 0L;
for (FileStatus file : files) { for (FileStatus file : files) {
if (file.getPath().getName().equals(report.getApplicationId() if (file.getPath().getName().equals(appLogPath.getName()
+ ".har")) { + ".har")) {
reportIt.remove(); eligible = false;
if (verbose) {
LOG.info("Skipping " + appLogPath.getName() +
" due to existing .har file");
}
break; break;
} }
totalFileSize += file.getLen(); totalFileSize += file.getLen();
}
if (totalFileSize > maxTotalLogsSize) { if (totalFileSize > maxTotalLogsSize) {
reportIt.remove(); eligible = false;
if (verbose) {
LOG.info("Skipping " + appLogPath.getName() + " due to " +
"total file size being too large (" + totalFileSize +
" > " + maxTotalLogsSize + ")");
}
break;
}
}
if (eligible) {
if (verbose) {
LOG.info("Adding " + appLogPath.getName() + " for user " +
userLogPath.getName());
}
eligibleApplications.add(
new AppInfo(appLogPath.getName(), userLogPath.getName()));
}
} else {
if (verbose) {
LOG.info("Skipping " + appLogPath.getName() + " due to not " +
"having enough log files (" + files.length + " < " +
minNumLogFiles + ")");
} }
} }
} catch (IOException ioe) { } catch (IOException ioe) {
// If the user doesn't have permission or it doesn't exist, then skip it // Ignore any apps we can't read
reportIt.remove(); if (verbose) {
LOG.info("Skipping logs under " + appLogPath + " due to " +
ioe.getMessage());
}
}
}
} catch (IOException ioe) {
// Ignore any apps we can't read
if (verbose) {
LOG.info("Skipping all logs under " + userLogPath + " due to " +
ioe.getMessage());
}
} }
} }
} }
@ -289,15 +366,26 @@ void checkFiles(FileSystem fs, Path remoteRootLogDir, String suffix) {
void checkMaxEligible() { void checkMaxEligible() {
// If we have too many eligible apps, remove the newest ones first // If we have too many eligible apps, remove the newest ones first
if (maxEligible > 0 && eligibleApplications.size() > maxEligible) { if (maxEligible > 0 && eligibleApplications.size() > maxEligible) {
List<ApplicationReport> sortedApplications = if (verbose) {
new ArrayList<ApplicationReport>(eligibleApplications); LOG.info("Too many applications (" + eligibleApplications.size() +
Collections.sort(sortedApplications, new Comparator<ApplicationReport>() { " > " + maxEligible + ")");
}
List<AppInfo> sortedApplications =
new ArrayList<AppInfo>(eligibleApplications);
Collections.sort(sortedApplications, new Comparator<AppInfo>() {
@Override @Override
public int compare(ApplicationReport o1, ApplicationReport o2) { public int compare(AppInfo o1, AppInfo o2) {
return Long.compare(o1.getFinishTime(), o2.getFinishTime()); int lCompare = Long.compare(o1.getFinishTime(), o2.getFinishTime());
if (lCompare == 0) {
return o1.getAppId().compareTo(o2.getAppId());
}
return lCompare;
} }
}); });
for (int i = maxEligible; i < sortedApplications.size(); i++) { for (int i = maxEligible; i < sortedApplications.size(); i++) {
if (verbose) {
LOG.info("Removing " + sortedApplications.get(i));
}
eligibleApplications.remove(sortedApplications.get(i)); eligibleApplications.remove(sortedApplications.get(i));
} }
} }
@ -325,24 +413,26 @@ public int compare(ApplicationReport o1, ApplicationReport o2) {
@VisibleForTesting @VisibleForTesting
void generateScript(File localScript, Path workingDir, void generateScript(File localScript, Path workingDir,
Path remoteRootLogDir, String suffix) throws IOException { Path remoteRootLogDir, String suffix) throws IOException {
if (verbose) {
LOG.info("Generating script at: " + localScript.getAbsolutePath()); LOG.info("Generating script at: " + localScript.getAbsolutePath());
}
String halrJarPath = HadoopArchiveLogsRunner.class.getProtectionDomain() String halrJarPath = HadoopArchiveLogsRunner.class.getProtectionDomain()
.getCodeSource().getLocation().getPath(); .getCodeSource().getLocation().getPath();
String harJarPath = HadoopArchives.class.getProtectionDomain() String harJarPath = HadoopArchives.class.getProtectionDomain()
.getCodeSource().getLocation().getPath(); .getCodeSource().getLocation().getPath();
String classpath = halrJarPath + File.pathSeparator + harJarPath; String classpath = halrJarPath + File.pathSeparator + harJarPath;
FileWriter fw = null; FileWriterWithEncoding fw = null;
try { try {
fw = new FileWriter(localScript); fw = new FileWriterWithEncoding(localScript, "UTF-8");
fw.write("#!/bin/bash\nset -e\nset -x\n"); fw.write("#!/bin/bash\nset -e\nset -x\n");
int containerCount = 1; int containerCount = 1;
for (ApplicationReport report : eligibleApplications) { for (AppInfo app : eligibleApplications) {
fw.write("if [ \"$YARN_SHELL_ID\" == \""); fw.write("if [ \"$YARN_SHELL_ID\" == \"");
fw.write(Integer.toString(containerCount)); fw.write(Integer.toString(containerCount));
fw.write("\" ]; then\n\tappId=\""); fw.write("\" ]; then\n\tappId=\"");
fw.write(report.getApplicationId().toString()); fw.write(app.getAppId());
fw.write("\"\n\tuser=\""); fw.write("\"\n\tuser=\"");
fw.write(report.getUser()); fw.write(app.getUser());
fw.write("\"\nel"); fw.write("\"\nel");
containerCount++; containerCount++;
} }
@ -382,6 +472,10 @@ private boolean runDistributedShell(File localScript) throws Exception {
"--shell_script", "--shell_script",
localScript.getAbsolutePath() localScript.getAbsolutePath()
}; };
if (verbose) {
LOG.info("Running Distributed Shell with arguments: " +
Arrays.toString(dsArgs));
}
final Client dsClient = new Client(new Configuration(conf)); final Client dsClient = new Client(new Configuration(conf));
dsClient.init(dsArgs); dsClient.init(dsArgs);
return dsClient.run(); return dsClient.run();
@ -400,4 +494,59 @@ public void setConf(Configuration conf) {
public Configuration getConf() { public Configuration getConf() {
return this.conf; return this.conf;
} }
@VisibleForTesting
static class AppInfo {
private String appId;
private String user;
private long finishTime;
AppInfo(String appId, String user) {
this.appId = appId;
this.user = user;
this.finishTime = 0L;
}
public String getAppId() {
return appId;
}
public String getUser() {
return user;
}
public long getFinishTime() {
return finishTime;
}
public void setFinishTime(long finishTime) {
this.finishTime = finishTime;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AppInfo appInfo = (AppInfo) o;
if (appId != null
? !appId.equals(appInfo.appId) : appInfo.appId != null) {
return false;
}
return !(user != null
? !user.equals(appInfo.user) : appInfo.user != null);
}
@Override
public int hashCode() {
int result = appId != null ? appId.hashCode() : 0;
result = 31 * result + (user != null ? user.hashCode() : 0);
return result;
}
}
} }

View File

@ -23,15 +23,12 @@
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -47,6 +44,7 @@
public class TestHadoopArchiveLogs { public class TestHadoopArchiveLogs {
private static final long CLUSTER_TIMESTAMP = System.currentTimeMillis(); private static final long CLUSTER_TIMESTAMP = System.currentTimeMillis();
private static final String USER = System.getProperty("user.name");
private static final int FILE_SIZE_INCREMENT = 4096; private static final int FILE_SIZE_INCREMENT = 4096;
private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT]; private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT];
static { static {
@ -54,96 +52,117 @@ public class TestHadoopArchiveLogs {
} }
@Test(timeout = 10000) @Test(timeout = 10000)
public void testCheckFiles() throws Exception { public void testCheckFilesAndSeedApps() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
FileSystem fs = FileSystem.getLocal(conf); FileSystem fs = FileSystem.getLocal(conf);
Path rootLogDir = new Path("target", "logs"); Path rootLogDir = new Path("target", "logs");
String suffix = "logs"; String suffix = "logs";
Path logDir = new Path(rootLogDir, Path logDir = new Path(rootLogDir, new Path(USER, suffix));
new Path(System.getProperty("user.name"), suffix));
fs.mkdirs(logDir); fs.mkdirs(logDir);
Assert.assertEquals(0, hal.eligibleApplications.size()); // no files found
ApplicationReport app1 = createAppReport(1); // no files found ApplicationId appId1 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1);
ApplicationReport app2 = createAppReport(2); // too few files Path app1Path = new Path(logDir, appId1.toString());
Path app2Path = new Path(logDir, app2.getApplicationId().toString()); fs.mkdirs(app1Path);
// too few files
ApplicationId appId2 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2);
Path app2Path = new Path(logDir, appId2.toString());
fs.mkdirs(app2Path); fs.mkdirs(app2Path);
createFile(fs, new Path(app2Path, "file1"), 1); createFile(fs, new Path(app2Path, "file1"), 1);
hal.minNumLogFiles = 2; hal.minNumLogFiles = 2;
ApplicationReport app3 = createAppReport(3); // too large // too large
Path app3Path = new Path(logDir, app3.getApplicationId().toString()); ApplicationId appId3 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 3);
Path app3Path = new Path(logDir, appId3.toString());
fs.mkdirs(app3Path); fs.mkdirs(app3Path);
createFile(fs, new Path(app3Path, "file1"), 2); createFile(fs, new Path(app3Path, "file1"), 2);
createFile(fs, new Path(app3Path, "file2"), 5); createFile(fs, new Path(app3Path, "file2"), 5);
hal.maxTotalLogsSize = FILE_SIZE_INCREMENT * 6; hal.maxTotalLogsSize = FILE_SIZE_INCREMENT * 6;
ApplicationReport app4 = createAppReport(4); // has har already // has har already
Path app4Path = new Path(logDir, app4.getApplicationId().toString()); ApplicationId appId4 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 4);
Path app4Path = new Path(logDir, appId4.toString());
fs.mkdirs(app4Path); fs.mkdirs(app4Path);
createFile(fs, new Path(app4Path, app4.getApplicationId() + ".har"), 1); createFile(fs, new Path(app4Path, appId4 + ".har"), 1);
ApplicationReport app5 = createAppReport(5); // just right // just right
Path app5Path = new Path(logDir, app5.getApplicationId().toString()); ApplicationId appId5 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 5);
Path app5Path = new Path(logDir, appId5.toString());
fs.mkdirs(app5Path); fs.mkdirs(app5Path);
createFile(fs, new Path(app5Path, "file1"), 2); createFile(fs, new Path(app5Path, "file1"), 2);
createFile(fs, new Path(app5Path, "file2"), 3); createFile(fs, new Path(app5Path, "file2"), 3);
hal.eligibleApplications.add(app1);
hal.eligibleApplications.add(app2);
hal.eligibleApplications.add(app3);
hal.eligibleApplications.add(app4);
hal.eligibleApplications.add(app5);
hal.checkFiles(fs, rootLogDir, suffix); Assert.assertEquals(0, hal.eligibleApplications.size());
hal.checkFilesAndSeedApps(fs, rootLogDir, suffix);
Assert.assertEquals(1, hal.eligibleApplications.size()); Assert.assertEquals(1, hal.eligibleApplications.size());
Assert.assertEquals(app5, hal.eligibleApplications.iterator().next()); Assert.assertEquals(appId5.toString(),
hal.eligibleApplications.iterator().next().getAppId());
} }
@Test(timeout = 10000) @Test(timeout = 10000)
public void testCheckMaxEligible() throws Exception { public void testCheckMaxEligible() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); HadoopArchiveLogs.AppInfo app1 = new HadoopArchiveLogs.AppInfo(
ApplicationReport app1 = createAppReport(1); ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1).toString(), USER);
app1.setFinishTime(CLUSTER_TIMESTAMP - 5); app1.setFinishTime(CLUSTER_TIMESTAMP - 5);
ApplicationReport app2 = createAppReport(2); HadoopArchiveLogs.AppInfo app2 = new HadoopArchiveLogs.AppInfo(
ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2).toString(), USER);
app2.setFinishTime(CLUSTER_TIMESTAMP - 10); app2.setFinishTime(CLUSTER_TIMESTAMP - 10);
ApplicationReport app3 = createAppReport(3); HadoopArchiveLogs.AppInfo app3 = new HadoopArchiveLogs.AppInfo(
app3.setFinishTime(CLUSTER_TIMESTAMP + 5); ApplicationId.newInstance(CLUSTER_TIMESTAMP, 3).toString(), USER);
ApplicationReport app4 = createAppReport(4); // app3 has no finish time set
app4.setFinishTime(CLUSTER_TIMESTAMP + 10); HadoopArchiveLogs.AppInfo app4 = new HadoopArchiveLogs.AppInfo(
ApplicationReport app5 = createAppReport(5); ApplicationId.newInstance(CLUSTER_TIMESTAMP, 4).toString(), USER);
app5.setFinishTime(CLUSTER_TIMESTAMP); app4.setFinishTime(CLUSTER_TIMESTAMP + 5);
HadoopArchiveLogs.AppInfo app5 = new HadoopArchiveLogs.AppInfo(
ApplicationId.newInstance(CLUSTER_TIMESTAMP, 5).toString(), USER);
app5.setFinishTime(CLUSTER_TIMESTAMP + 10);
HadoopArchiveLogs.AppInfo app6 = new HadoopArchiveLogs.AppInfo(
ApplicationId.newInstance(CLUSTER_TIMESTAMP, 6).toString(), USER);
// app6 has no finish time set
HadoopArchiveLogs.AppInfo app7 = new HadoopArchiveLogs.AppInfo(
ApplicationId.newInstance(CLUSTER_TIMESTAMP, 7).toString(), USER);
app7.setFinishTime(CLUSTER_TIMESTAMP);
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
Assert.assertEquals(0, hal.eligibleApplications.size()); Assert.assertEquals(0, hal.eligibleApplications.size());
hal.eligibleApplications.add(app1); hal.eligibleApplications.add(app1);
hal.eligibleApplications.add(app2); hal.eligibleApplications.add(app2);
hal.eligibleApplications.add(app3); hal.eligibleApplications.add(app3);
hal.eligibleApplications.add(app4); hal.eligibleApplications.add(app4);
hal.eligibleApplications.add(app5); hal.eligibleApplications.add(app5);
hal.eligibleApplications.add(app6);
hal.eligibleApplications.add(app7);
Assert.assertEquals(7, hal.eligibleApplications.size());
hal.maxEligible = -1; hal.maxEligible = -1;
hal.checkMaxEligible(); hal.checkMaxEligible();
Assert.assertEquals(7, hal.eligibleApplications.size());
hal.maxEligible = 6;
hal.checkMaxEligible();
Assert.assertEquals(6, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app5));
hal.maxEligible = 5;
hal.checkMaxEligible();
Assert.assertEquals(5, hal.eligibleApplications.size()); Assert.assertEquals(5, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app4));
hal.maxEligible = 4; hal.maxEligible = 4;
hal.checkMaxEligible(); hal.checkMaxEligible();
Assert.assertEquals(4, hal.eligibleApplications.size()); Assert.assertEquals(4, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app4)); Assert.assertFalse(hal.eligibleApplications.contains(app7));
hal.maxEligible = 3; hal.maxEligible = 3;
hal.checkMaxEligible(); hal.checkMaxEligible();
Assert.assertEquals(3, hal.eligibleApplications.size()); Assert.assertEquals(3, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app3)); Assert.assertFalse(hal.eligibleApplications.contains(app1));
hal.maxEligible = 2; hal.maxEligible = 2;
hal.checkMaxEligible(); hal.checkMaxEligible();
Assert.assertEquals(2, hal.eligibleApplications.size()); Assert.assertEquals(2, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app5)); Assert.assertFalse(hal.eligibleApplications.contains(app2));
hal.maxEligible = 1; hal.maxEligible = 1;
hal.checkMaxEligible(); hal.checkMaxEligible();
Assert.assertEquals(1, hal.eligibleApplications.size()); Assert.assertEquals(1, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app1)); Assert.assertFalse(hal.eligibleApplications.contains(app6));
Assert.assertTrue(hal.eligibleApplications.contains(app3));
} }
@Test(timeout = 10000) @Test(timeout = 10000)
public void testFindAggregatedApps() throws Exception { public void testFilterAppsByAggregatedStatus() throws Exception {
MiniYARNCluster yarnCluster = null; MiniYARNCluster yarnCluster = null;
try { try {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -156,32 +175,66 @@ public void testFindAggregatedApps() throws Exception {
conf = yarnCluster.getConfig(); conf = yarnCluster.getConfig();
RMContext rmContext = yarnCluster.getResourceManager().getRMContext(); RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
RMAppImpl app1 = (RMAppImpl)createRMApp(1, conf, rmContext, RMAppImpl appImpl1 = (RMAppImpl)createRMApp(1, conf, rmContext,
LogAggregationStatus.DISABLED); LogAggregationStatus.DISABLED);
RMAppImpl app2 = (RMAppImpl)createRMApp(2, conf, rmContext, RMAppImpl appImpl2 = (RMAppImpl)createRMApp(2, conf, rmContext,
LogAggregationStatus.FAILED); LogAggregationStatus.FAILED);
RMAppImpl app3 = (RMAppImpl)createRMApp(3, conf, rmContext, RMAppImpl appImpl3 = (RMAppImpl)createRMApp(3, conf, rmContext,
LogAggregationStatus.NOT_START); LogAggregationStatus.NOT_START);
RMAppImpl app4 = (RMAppImpl)createRMApp(4, conf, rmContext, RMAppImpl appImpl4 = (RMAppImpl)createRMApp(4, conf, rmContext,
LogAggregationStatus.SUCCEEDED); LogAggregationStatus.SUCCEEDED);
RMAppImpl app5 = (RMAppImpl)createRMApp(5, conf, rmContext, RMAppImpl appImpl5 = (RMAppImpl)createRMApp(5, conf, rmContext,
LogAggregationStatus.RUNNING); LogAggregationStatus.RUNNING);
RMAppImpl app6 = (RMAppImpl)createRMApp(6, conf, rmContext, RMAppImpl appImpl6 = (RMAppImpl)createRMApp(6, conf, rmContext,
LogAggregationStatus.RUNNING_WITH_FAILURE); LogAggregationStatus.RUNNING_WITH_FAILURE);
RMAppImpl app7 = (RMAppImpl)createRMApp(7, conf, rmContext, RMAppImpl appImpl7 = (RMAppImpl)createRMApp(7, conf, rmContext,
LogAggregationStatus.TIME_OUT); LogAggregationStatus.TIME_OUT);
rmContext.getRMApps().put(app1.getApplicationId(), app1); RMAppImpl appImpl8 = (RMAppImpl)createRMApp(8, conf, rmContext,
rmContext.getRMApps().put(app2.getApplicationId(), app2); LogAggregationStatus.SUCCEEDED);
rmContext.getRMApps().put(app3.getApplicationId(), app3); rmContext.getRMApps().put(appImpl1.getApplicationId(), appImpl1);
rmContext.getRMApps().put(app4.getApplicationId(), app4); rmContext.getRMApps().put(appImpl2.getApplicationId(), appImpl2);
rmContext.getRMApps().put(app5.getApplicationId(), app5); rmContext.getRMApps().put(appImpl3.getApplicationId(), appImpl3);
rmContext.getRMApps().put(app6.getApplicationId(), app6); rmContext.getRMApps().put(appImpl4.getApplicationId(), appImpl4);
rmContext.getRMApps().put(app7.getApplicationId(), app7); rmContext.getRMApps().put(appImpl5.getApplicationId(), appImpl5);
rmContext.getRMApps().put(appImpl6.getApplicationId(), appImpl6);
rmContext.getRMApps().put(appImpl7.getApplicationId(), appImpl7);
// appImpl8 is not in the RM
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
Assert.assertEquals(0, hal.eligibleApplications.size()); Assert.assertEquals(0, hal.eligibleApplications.size());
hal.findAggregatedApps(); hal.eligibleApplications.add(
Assert.assertEquals(2, hal.eligibleApplications.size()); new HadoopArchiveLogs.AppInfo(appImpl1.getApplicationId().toString(),
USER));
hal.eligibleApplications.add(
new HadoopArchiveLogs.AppInfo(appImpl2.getApplicationId().toString(),
USER));
hal.eligibleApplications.add(
new HadoopArchiveLogs.AppInfo(appImpl3.getApplicationId().toString(),
USER));
HadoopArchiveLogs.AppInfo app4 =
new HadoopArchiveLogs.AppInfo(appImpl4.getApplicationId().toString(),
USER);
hal.eligibleApplications.add(app4);
hal.eligibleApplications.add(
new HadoopArchiveLogs.AppInfo(appImpl5.getApplicationId().toString(),
USER));
hal.eligibleApplications.add(
new HadoopArchiveLogs.AppInfo(appImpl6.getApplicationId().toString(),
USER));
HadoopArchiveLogs.AppInfo app7 =
new HadoopArchiveLogs.AppInfo(appImpl7.getApplicationId().toString(),
USER);
hal.eligibleApplications.add(app7);
HadoopArchiveLogs.AppInfo app8 =
new HadoopArchiveLogs.AppInfo(appImpl8.getApplicationId().toString(),
USER);
hal.eligibleApplications.add(app8);
Assert.assertEquals(8, hal.eligibleApplications.size());
hal.filterAppsByAggregatedStatus();
Assert.assertEquals(3, hal.eligibleApplications.size());
Assert.assertTrue(hal.eligibleApplications.contains(app4));
Assert.assertTrue(hal.eligibleApplications.contains(app7));
Assert.assertTrue(hal.eligibleApplications.contains(app8));
} finally { } finally {
if (yarnCluster != null) { if (yarnCluster != null) {
yarnCluster.stop(); yarnCluster.stop();
@ -193,10 +246,12 @@ public void testFindAggregatedApps() throws Exception {
public void testGenerateScript() throws Exception { public void testGenerateScript() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
ApplicationReport app1 = createAppReport(1); ApplicationId app1 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1);
ApplicationReport app2 = createAppReport(2); ApplicationId app2 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2);
hal.eligibleApplications.add(app1); hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app1.toString(),
hal.eligibleApplications.add(app2); USER));
hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app2.toString(),
USER));
File localScript = new File("target", "script.sh"); File localScript = new File("target", "script.sh");
Path workingDir = new Path("/tmp", "working"); Path workingDir = new Path("/tmp", "working");
@ -213,22 +268,16 @@ public void testGenerateScript() throws Exception {
Assert.assertEquals("set -e", lines[1]); Assert.assertEquals("set -e", lines[1]);
Assert.assertEquals("set -x", lines[2]); Assert.assertEquals("set -x", lines[2]);
Assert.assertEquals("if [ \"$YARN_SHELL_ID\" == \"1\" ]; then", lines[3]); Assert.assertEquals("if [ \"$YARN_SHELL_ID\" == \"1\" ]; then", lines[3]);
if (lines[4].contains(app1.getApplicationId().toString())) { if (lines[4].contains(app1.toString())) {
Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString() Assert.assertEquals("\tappId=\"" + app1.toString() + "\"", lines[4]);
+ "\"", lines[4]); Assert.assertEquals("\tappId=\"" + app2.toString() + "\"", lines[7]);
Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString()
+ "\"", lines[7]);
} else { } else {
Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString() Assert.assertEquals("\tappId=\"" + app2.toString() + "\"", lines[4]);
+ "\"", lines[4]); Assert.assertEquals("\tappId=\"" + app1.toString() + "\"", lines[7]);
Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString()
+ "\"", lines[7]);
} }
Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"", Assert.assertEquals("\tuser=\"" + USER + "\"", lines[5]);
lines[5]);
Assert.assertEquals("elif [ \"$YARN_SHELL_ID\" == \"2\" ]; then", lines[6]); Assert.assertEquals("elif [ \"$YARN_SHELL_ID\" == \"2\" ]; then", lines[6]);
Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"", Assert.assertEquals("\tuser=\"" + USER + "\"", lines[8]);
lines[8]);
Assert.assertEquals("else", lines[9]); Assert.assertEquals("else", lines[9]);
Assert.assertEquals("\techo \"Unknown Mapping!\"", lines[10]); Assert.assertEquals("\techo \"Unknown Mapping!\"", lines[10]);
Assert.assertEquals("\texit 1", lines[11]); Assert.assertEquals("\texit 1", lines[11]);
@ -241,15 +290,23 @@ public void testGenerateScript() throws Exception {
remoteRootLogDir.toString() + " -suffix " + suffix, lines[15]); remoteRootLogDir.toString() + " -suffix " + suffix, lines[15]);
} }
private static ApplicationReport createAppReport(int id) { /**
ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id); * If this test failes, then a new Log Aggregation Status was added. Make
return ApplicationReport.newInstance( * sure that {@link HadoopArchiveLogs#filterAppsByAggregatedStatus()} and this test
appId, * are updated as well, if necessary.
ApplicationAttemptId.newInstance(appId, 1), * @throws Exception
System.getProperty("user.name"), */
null, null, null, 0, null, YarnApplicationState.FINISHED, null, @Test(timeout = 5000)
null, 0L, 0L, FinalApplicationStatus.SUCCEEDED, null, null, 100f, public void testStatuses() throws Exception {
null, null); LogAggregationStatus[] statuses = new LogAggregationStatus[7];
statuses[0] = LogAggregationStatus.DISABLED;
statuses[1] = LogAggregationStatus.NOT_START;
statuses[2] = LogAggregationStatus.RUNNING;
statuses[3] = LogAggregationStatus.RUNNING_WITH_FAILURE;
statuses[4] = LogAggregationStatus.SUCCEEDED;
statuses[5] = LogAggregationStatus.FAILED;
statuses[6] = LogAggregationStatus.TIME_OUT;
Assert.assertArrayEquals(statuses, LogAggregationStatus.values());
} }
private static void createFile(FileSystem fs, Path p, long sizeMultiple) private static void createFile(FileSystem fs, Path p, long sizeMultiple)
@ -265,6 +322,7 @@ private static void createFile(FileSystem fs, Path p, long sizeMultiple)
out.close(); out.close();
} }
} }
Assert.assertTrue(fs.exists(p));
} }
private static RMApp createRMApp(int id, Configuration conf, RMContext rmContext, private static RMApp createRMApp(int id, Configuration conf, RMContext rmContext,
@ -272,11 +330,10 @@ private static RMApp createRMApp(int id, Configuration conf, RMContext rmContext
ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id); ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id);
ApplicationSubmissionContext submissionContext = ApplicationSubmissionContext submissionContext =
ApplicationSubmissionContext.newInstance(appId, "test", "default", ApplicationSubmissionContext.newInstance(appId, "test", "default",
Priority.newInstance(0), null, false, true, Priority.newInstance(0), null, true, true,
2, Resource.newInstance(10, 2), "test"); 2, Resource.newInstance(10, 2), "test");
return new RMAppImpl(appId, rmContext, conf, "test", return new RMAppImpl(appId, rmContext, conf, "test",
System.getProperty("user.name"), "default", submissionContext, USER, "default", submissionContext, rmContext.getScheduler(),
rmContext.getScheduler(),
rmContext.getApplicationMasterService(), rmContext.getApplicationMasterService(),
System.currentTimeMillis(), "test", System.currentTimeMillis(), "test",
null, null) { null, null) {