MAPREDUCE-6480. archive-logs tool may miss applications (rkanter)

This commit is contained in:
Robert Kanter 2015-09-25 14:59:49 -07:00
parent 83e99d06d0
commit d3c49e7662
5 changed files with 392 additions and 134 deletions

View File

@ -580,6 +580,8 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6484. Yarn Client uses local address instead of RM address as
token renewer in a secure cluster when RM HA is enabled. (Zhihai Xu)
MAPREDUCE-6480. archive-logs tool may miss applications (rkanter)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<FindBugsFilter>
<!--
Ignore warnings for usage of System.exit. These are appropriate.
-->
<Match>
<Class name="org.apache.hadoop.tools.HadoopArchiveLogs" />
<Method name="handleOpts" />
<Bug pattern="DM_EXIT" />
</Match>
<Match>
<Class name="org.apache.hadoop.tools.HadoopArchiveLogs" />
<Method name="run" />
<Bug pattern="DM_EXIT" />
</Match>
</FindBugsFilter>

View File

@ -118,6 +118,12 @@
<scope>test</scope>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
@ -166,6 +172,18 @@
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<configuration>
<findbugsXmlOutput>true</findbugsXmlOutput>
<xmlOutput>true</xmlOutput>
<excludeFilterFile>
${basedir}/dev-support/findbugs-exclude.xml
</excludeFilterFile>
<effort>Max</effort>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -26,12 +26,14 @@ import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.output.FileWriterWithEncoding;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.JobConf;
@ -43,13 +45,15 @@ import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster;
import org.apache.hadoop.yarn.applications.distributedshell.Client;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@ -71,6 +75,7 @@ public class HadoopArchiveLogs implements Tool {
private static final String MIN_NUM_LOG_FILES_OPTION = "minNumberLogFiles";
private static final String MAX_TOTAL_LOGS_SIZE_OPTION = "maxTotalLogsSize";
private static final String MEMORY_OPTION = "memory";
private static final String VERBOSE_OPTION = "verbose";
private static final int DEFAULT_MAX_ELIGIBLE = -1;
private static final int DEFAULT_MIN_NUM_LOG_FILES = 20;
@ -85,9 +90,10 @@ public class HadoopArchiveLogs implements Tool {
long maxTotalLogsSize = DEFAULT_MAX_TOTAL_LOGS_SIZE * 1024L * 1024L;
@VisibleForTesting
long memory = DEFAULT_MEMORY;
private boolean verbose = false;
@VisibleForTesting
Set<ApplicationReport> eligibleApplications;
Set<AppInfo> eligibleApplications;
private JobConf conf;
@ -122,17 +128,20 @@ public class HadoopArchiveLogs implements Tool {
public int run(String[] args) throws Exception {
handleOpts(args);
findAggregatedApps();
FileSystem fs = null;
Path remoteRootLogDir = new Path(conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
Path workingDir = new Path(remoteRootLogDir, "archive-logs-work");
if (verbose) {
LOG.info("Remote Log Dir Root: " + remoteRootLogDir);
LOG.info("Log Suffix: " + suffix);
LOG.info("Working Dir: " + workingDir);
}
try {
fs = FileSystem.get(conf);
checkFiles(fs, remoteRootLogDir, suffix);
checkFilesAndSeedApps(fs, remoteRootLogDir, suffix);
// Prepare working directory
if (fs.exists(workingDir)) {
@ -147,6 +156,8 @@ public class HadoopArchiveLogs implements Tool {
}
}
filterAppsByAggregatedStatus();
checkMaxEligible();
if (eligibleApplications.isEmpty()) {
@ -156,8 +167,8 @@ public class HadoopArchiveLogs implements Tool {
StringBuilder sb =
new StringBuilder("Will process the following applications:");
for (ApplicationReport report : eligibleApplications) {
sb.append("\n\t").append(report.getApplicationId());
for (AppInfo app : eligibleApplications) {
sb.append("\n\t").append(app.getAppId());
}
LOG.info(sb.toString());
@ -189,11 +200,14 @@ public class HadoopArchiveLogs implements Tool {
"The amount of memory (in megabytes) for each container (default: "
+ DEFAULT_MEMORY + ")");
memoryOpt.setArgName("megabytes");
Option verboseOpt = new Option(VERBOSE_OPTION, false,
"Print more details.");
opts.addOption(helpOpt);
opts.addOption(maxEligibleOpt);
opts.addOption(minNumLogFilesOpt);
opts.addOption(maxTotalLogsSizeOpt);
opts.addOption(memoryOpt);
opts.addOption(verboseOpt);
try {
CommandLineParser parser = new GnuParser();
@ -225,6 +239,9 @@ public class HadoopArchiveLogs implements Tool {
if (commandLine.hasOption(MEMORY_OPTION)) {
memory = Long.parseLong(commandLine.getOptionValue(MEMORY_OPTION));
}
if (commandLine.hasOption(VERBOSE_OPTION)) {
verbose = true;
}
} catch (ParseException pe) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("yarn archive-logs", opts);
@ -233,17 +250,39 @@ public class HadoopArchiveLogs implements Tool {
}
@VisibleForTesting
void findAggregatedApps() throws IOException, YarnException {
void filterAppsByAggregatedStatus() throws IOException, YarnException {
YarnClient client = YarnClient.createYarnClient();
try {
client.init(getConf());
client.start();
List<ApplicationReport> reports = client.getApplications();
for (ApplicationReport report : reports) {
for (Iterator<AppInfo> it = eligibleApplications.iterator();
it.hasNext();) {
AppInfo app = it.next();
try {
ApplicationReport report = client.getApplicationReport(
ConverterUtils.toApplicationId(app.getAppId()));
LogAggregationStatus aggStatus = report.getLogAggregationStatus();
if (aggStatus.equals(LogAggregationStatus.SUCCEEDED) ||
if (aggStatus.equals(LogAggregationStatus.RUNNING) ||
aggStatus.equals(LogAggregationStatus.RUNNING_WITH_FAILURE) ||
aggStatus.equals(LogAggregationStatus.NOT_START) ||
aggStatus.equals(LogAggregationStatus.DISABLED) ||
aggStatus.equals(LogAggregationStatus.FAILED)) {
eligibleApplications.add(report);
if (verbose) {
LOG.info("Skipping " + app.getAppId() +
" due to aggregation status being " + aggStatus);
}
it.remove();
} else {
if (verbose) {
LOG.info(app.getAppId() + " has aggregation status " + aggStatus);
}
app.setFinishTime(report.getFinishTime());
}
} catch (ApplicationNotFoundException e) {
// Assume the aggregation has finished
if (verbose) {
LOG.info(app.getAppId() + " not in the ResourceManager");
}
}
}
} finally {
@ -254,33 +293,71 @@ public class HadoopArchiveLogs implements Tool {
}
@VisibleForTesting
void checkFiles(FileSystem fs, Path remoteRootLogDir, String suffix) {
for (Iterator<ApplicationReport> reportIt = eligibleApplications.iterator();
reportIt.hasNext(); ) {
ApplicationReport report = reportIt.next();
long totalFileSize = 0L;
void checkFilesAndSeedApps(FileSystem fs, Path remoteRootLogDir,
String suffix) throws IOException {
for (RemoteIterator<FileStatus> userIt =
fs.listStatusIterator(remoteRootLogDir); userIt.hasNext();) {
Path userLogPath = userIt.next().getPath();
try {
FileStatus[] files = fs.listStatus(
LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir,
report.getApplicationId(), report.getUser(), suffix));
if (files.length < minNumLogFiles) {
reportIt.remove();
} else {
for (RemoteIterator<FileStatus> appIt =
fs.listStatusIterator(new Path(userLogPath, suffix));
appIt.hasNext();) {
Path appLogPath = appIt.next().getPath();
try {
FileStatus[] files = fs.listStatus(appLogPath);
if (files.length >= minNumLogFiles) {
boolean eligible = true;
long totalFileSize = 0L;
for (FileStatus file : files) {
if (file.getPath().getName().equals(report.getApplicationId()
if (file.getPath().getName().equals(appLogPath.getName()
+ ".har")) {
reportIt.remove();
eligible = false;
if (verbose) {
LOG.info("Skipping " + appLogPath.getName() +
" due to existing .har file");
}
break;
}
totalFileSize += file.getLen();
}
if (totalFileSize > maxTotalLogsSize) {
reportIt.remove();
eligible = false;
if (verbose) {
LOG.info("Skipping " + appLogPath.getName() + " due to " +
"total file size being too large (" + totalFileSize +
" > " + maxTotalLogsSize + ")");
}
break;
}
}
if (eligible) {
if (verbose) {
LOG.info("Adding " + appLogPath.getName() + " for user " +
userLogPath.getName());
}
eligibleApplications.add(
new AppInfo(appLogPath.getName(), userLogPath.getName()));
}
} else {
if (verbose) {
LOG.info("Skipping " + appLogPath.getName() + " due to not " +
"having enough log files (" + files.length + " < " +
minNumLogFiles + ")");
}
}
} catch (IOException ioe) {
// If the user doesn't have permission or it doesn't exist, then skip it
reportIt.remove();
// Ignore any apps we can't read
if (verbose) {
LOG.info("Skipping logs under " + appLogPath + " due to " +
ioe.getMessage());
}
}
}
} catch (IOException ioe) {
// Ignore any apps we can't read
if (verbose) {
LOG.info("Skipping all logs under " + userLogPath + " due to " +
ioe.getMessage());
}
}
}
}
@ -289,15 +366,26 @@ public class HadoopArchiveLogs implements Tool {
void checkMaxEligible() {
// If we have too many eligible apps, remove the newest ones first
if (maxEligible > 0 && eligibleApplications.size() > maxEligible) {
List<ApplicationReport> sortedApplications =
new ArrayList<ApplicationReport>(eligibleApplications);
Collections.sort(sortedApplications, new Comparator<ApplicationReport>() {
if (verbose) {
LOG.info("Too many applications (" + eligibleApplications.size() +
" > " + maxEligible + ")");
}
List<AppInfo> sortedApplications =
new ArrayList<AppInfo>(eligibleApplications);
Collections.sort(sortedApplications, new Comparator<AppInfo>() {
@Override
public int compare(ApplicationReport o1, ApplicationReport o2) {
return Long.compare(o1.getFinishTime(), o2.getFinishTime());
public int compare(AppInfo o1, AppInfo o2) {
int lCompare = Long.compare(o1.getFinishTime(), o2.getFinishTime());
if (lCompare == 0) {
return o1.getAppId().compareTo(o2.getAppId());
}
return lCompare;
}
});
for (int i = maxEligible; i < sortedApplications.size(); i++) {
if (verbose) {
LOG.info("Removing " + sortedApplications.get(i));
}
eligibleApplications.remove(sortedApplications.get(i));
}
}
@ -325,24 +413,26 @@ public class HadoopArchiveLogs implements Tool {
@VisibleForTesting
void generateScript(File localScript, Path workingDir,
Path remoteRootLogDir, String suffix) throws IOException {
if (verbose) {
LOG.info("Generating script at: " + localScript.getAbsolutePath());
}
String halrJarPath = HadoopArchiveLogsRunner.class.getProtectionDomain()
.getCodeSource().getLocation().getPath();
String harJarPath = HadoopArchives.class.getProtectionDomain()
.getCodeSource().getLocation().getPath();
String classpath = halrJarPath + File.pathSeparator + harJarPath;
FileWriter fw = null;
FileWriterWithEncoding fw = null;
try {
fw = new FileWriter(localScript);
fw = new FileWriterWithEncoding(localScript, "UTF-8");
fw.write("#!/bin/bash\nset -e\nset -x\n");
int containerCount = 1;
for (ApplicationReport report : eligibleApplications) {
for (AppInfo app : eligibleApplications) {
fw.write("if [ \"$YARN_SHELL_ID\" == \"");
fw.write(Integer.toString(containerCount));
fw.write("\" ]; then\n\tappId=\"");
fw.write(report.getApplicationId().toString());
fw.write(app.getAppId());
fw.write("\"\n\tuser=\"");
fw.write(report.getUser());
fw.write(app.getUser());
fw.write("\"\nel");
containerCount++;
}
@ -382,6 +472,10 @@ public class HadoopArchiveLogs implements Tool {
"--shell_script",
localScript.getAbsolutePath()
};
if (verbose) {
LOG.info("Running Distributed Shell with arguments: " +
Arrays.toString(dsArgs));
}
final Client dsClient = new Client(new Configuration(conf));
dsClient.init(dsArgs);
return dsClient.run();
@ -400,4 +494,59 @@ public class HadoopArchiveLogs implements Tool {
public Configuration getConf() {
return this.conf;
}
@VisibleForTesting
static class AppInfo {
private String appId;
private String user;
private long finishTime;
AppInfo(String appId, String user) {
this.appId = appId;
this.user = user;
this.finishTime = 0L;
}
public String getAppId() {
return appId;
}
public String getUser() {
return user;
}
public long getFinishTime() {
return finishTime;
}
public void setFinishTime(long finishTime) {
this.finishTime = finishTime;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AppInfo appInfo = (AppInfo) o;
if (appId != null
? !appId.equals(appInfo.appId) : appInfo.appId != null) {
return false;
}
return !(user != null
? !user.equals(appInfo.user) : appInfo.user != null);
}
@Override
public int hashCode() {
int result = appId != null ? appId.hashCode() : 0;
result = 31 * result + (user != null ? user.hashCode() : 0);
return result;
}
}
}

View File

@ -23,15 +23,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -47,6 +44,7 @@ import java.util.Random;
public class TestHadoopArchiveLogs {
private static final long CLUSTER_TIMESTAMP = System.currentTimeMillis();
private static final String USER = System.getProperty("user.name");
private static final int FILE_SIZE_INCREMENT = 4096;
private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT];
static {
@ -54,96 +52,117 @@ public class TestHadoopArchiveLogs {
}
@Test(timeout = 10000)
public void testCheckFiles() throws Exception {
public void testCheckFilesAndSeedApps() throws Exception {
Configuration conf = new Configuration();
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
FileSystem fs = FileSystem.getLocal(conf);
Path rootLogDir = new Path("target", "logs");
String suffix = "logs";
Path logDir = new Path(rootLogDir,
new Path(System.getProperty("user.name"), suffix));
Path logDir = new Path(rootLogDir, new Path(USER, suffix));
fs.mkdirs(logDir);
Assert.assertEquals(0, hal.eligibleApplications.size());
ApplicationReport app1 = createAppReport(1); // no files found
ApplicationReport app2 = createAppReport(2); // too few files
Path app2Path = new Path(logDir, app2.getApplicationId().toString());
// no files found
ApplicationId appId1 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1);
Path app1Path = new Path(logDir, appId1.toString());
fs.mkdirs(app1Path);
// too few files
ApplicationId appId2 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2);
Path app2Path = new Path(logDir, appId2.toString());
fs.mkdirs(app2Path);
createFile(fs, new Path(app2Path, "file1"), 1);
hal.minNumLogFiles = 2;
ApplicationReport app3 = createAppReport(3); // too large
Path app3Path = new Path(logDir, app3.getApplicationId().toString());
// too large
ApplicationId appId3 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 3);
Path app3Path = new Path(logDir, appId3.toString());
fs.mkdirs(app3Path);
createFile(fs, new Path(app3Path, "file1"), 2);
createFile(fs, new Path(app3Path, "file2"), 5);
hal.maxTotalLogsSize = FILE_SIZE_INCREMENT * 6;
ApplicationReport app4 = createAppReport(4); // has har already
Path app4Path = new Path(logDir, app4.getApplicationId().toString());
// has har already
ApplicationId appId4 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 4);
Path app4Path = new Path(logDir, appId4.toString());
fs.mkdirs(app4Path);
createFile(fs, new Path(app4Path, app4.getApplicationId() + ".har"), 1);
ApplicationReport app5 = createAppReport(5); // just right
Path app5Path = new Path(logDir, app5.getApplicationId().toString());
createFile(fs, new Path(app4Path, appId4 + ".har"), 1);
// just right
ApplicationId appId5 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 5);
Path app5Path = new Path(logDir, appId5.toString());
fs.mkdirs(app5Path);
createFile(fs, new Path(app5Path, "file1"), 2);
createFile(fs, new Path(app5Path, "file2"), 3);
hal.eligibleApplications.add(app1);
hal.eligibleApplications.add(app2);
hal.eligibleApplications.add(app3);
hal.eligibleApplications.add(app4);
hal.eligibleApplications.add(app5);
hal.checkFiles(fs, rootLogDir, suffix);
Assert.assertEquals(0, hal.eligibleApplications.size());
hal.checkFilesAndSeedApps(fs, rootLogDir, suffix);
Assert.assertEquals(1, hal.eligibleApplications.size());
Assert.assertEquals(app5, hal.eligibleApplications.iterator().next());
Assert.assertEquals(appId5.toString(),
hal.eligibleApplications.iterator().next().getAppId());
}
@Test(timeout = 10000)
public void testCheckMaxEligible() throws Exception {
Configuration conf = new Configuration();
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
ApplicationReport app1 = createAppReport(1);
HadoopArchiveLogs.AppInfo app1 = new HadoopArchiveLogs.AppInfo(
ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1).toString(), USER);
app1.setFinishTime(CLUSTER_TIMESTAMP - 5);
ApplicationReport app2 = createAppReport(2);
HadoopArchiveLogs.AppInfo app2 = new HadoopArchiveLogs.AppInfo(
ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2).toString(), USER);
app2.setFinishTime(CLUSTER_TIMESTAMP - 10);
ApplicationReport app3 = createAppReport(3);
app3.setFinishTime(CLUSTER_TIMESTAMP + 5);
ApplicationReport app4 = createAppReport(4);
app4.setFinishTime(CLUSTER_TIMESTAMP + 10);
ApplicationReport app5 = createAppReport(5);
app5.setFinishTime(CLUSTER_TIMESTAMP);
HadoopArchiveLogs.AppInfo app3 = new HadoopArchiveLogs.AppInfo(
ApplicationId.newInstance(CLUSTER_TIMESTAMP, 3).toString(), USER);
// app3 has no finish time set
HadoopArchiveLogs.AppInfo app4 = new HadoopArchiveLogs.AppInfo(
ApplicationId.newInstance(CLUSTER_TIMESTAMP, 4).toString(), USER);
app4.setFinishTime(CLUSTER_TIMESTAMP + 5);
HadoopArchiveLogs.AppInfo app5 = new HadoopArchiveLogs.AppInfo(
ApplicationId.newInstance(CLUSTER_TIMESTAMP, 5).toString(), USER);
app5.setFinishTime(CLUSTER_TIMESTAMP + 10);
HadoopArchiveLogs.AppInfo app6 = new HadoopArchiveLogs.AppInfo(
ApplicationId.newInstance(CLUSTER_TIMESTAMP, 6).toString(), USER);
// app6 has no finish time set
HadoopArchiveLogs.AppInfo app7 = new HadoopArchiveLogs.AppInfo(
ApplicationId.newInstance(CLUSTER_TIMESTAMP, 7).toString(), USER);
app7.setFinishTime(CLUSTER_TIMESTAMP);
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
Assert.assertEquals(0, hal.eligibleApplications.size());
hal.eligibleApplications.add(app1);
hal.eligibleApplications.add(app2);
hal.eligibleApplications.add(app3);
hal.eligibleApplications.add(app4);
hal.eligibleApplications.add(app5);
hal.eligibleApplications.add(app6);
hal.eligibleApplications.add(app7);
Assert.assertEquals(7, hal.eligibleApplications.size());
hal.maxEligible = -1;
hal.checkMaxEligible();
Assert.assertEquals(7, hal.eligibleApplications.size());
hal.maxEligible = 6;
hal.checkMaxEligible();
Assert.assertEquals(6, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app5));
hal.maxEligible = 5;
hal.checkMaxEligible();
Assert.assertEquals(5, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app4));
hal.maxEligible = 4;
hal.checkMaxEligible();
Assert.assertEquals(4, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app4));
Assert.assertFalse(hal.eligibleApplications.contains(app7));
hal.maxEligible = 3;
hal.checkMaxEligible();
Assert.assertEquals(3, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app3));
Assert.assertFalse(hal.eligibleApplications.contains(app1));
hal.maxEligible = 2;
hal.checkMaxEligible();
Assert.assertEquals(2, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app5));
Assert.assertFalse(hal.eligibleApplications.contains(app2));
hal.maxEligible = 1;
hal.checkMaxEligible();
Assert.assertEquals(1, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app1));
Assert.assertFalse(hal.eligibleApplications.contains(app6));
Assert.assertTrue(hal.eligibleApplications.contains(app3));
}
@Test(timeout = 10000)
public void testFindAggregatedApps() throws Exception {
public void testFilterAppsByAggregatedStatus() throws Exception {
MiniYARNCluster yarnCluster = null;
try {
Configuration conf = new Configuration();
@ -156,32 +175,66 @@ public class TestHadoopArchiveLogs {
conf = yarnCluster.getConfig();
RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
RMAppImpl app1 = (RMAppImpl)createRMApp(1, conf, rmContext,
RMAppImpl appImpl1 = (RMAppImpl)createRMApp(1, conf, rmContext,
LogAggregationStatus.DISABLED);
RMAppImpl app2 = (RMAppImpl)createRMApp(2, conf, rmContext,
RMAppImpl appImpl2 = (RMAppImpl)createRMApp(2, conf, rmContext,
LogAggregationStatus.FAILED);
RMAppImpl app3 = (RMAppImpl)createRMApp(3, conf, rmContext,
RMAppImpl appImpl3 = (RMAppImpl)createRMApp(3, conf, rmContext,
LogAggregationStatus.NOT_START);
RMAppImpl app4 = (RMAppImpl)createRMApp(4, conf, rmContext,
RMAppImpl appImpl4 = (RMAppImpl)createRMApp(4, conf, rmContext,
LogAggregationStatus.SUCCEEDED);
RMAppImpl app5 = (RMAppImpl)createRMApp(5, conf, rmContext,
RMAppImpl appImpl5 = (RMAppImpl)createRMApp(5, conf, rmContext,
LogAggregationStatus.RUNNING);
RMAppImpl app6 = (RMAppImpl)createRMApp(6, conf, rmContext,
RMAppImpl appImpl6 = (RMAppImpl)createRMApp(6, conf, rmContext,
LogAggregationStatus.RUNNING_WITH_FAILURE);
RMAppImpl app7 = (RMAppImpl)createRMApp(7, conf, rmContext,
RMAppImpl appImpl7 = (RMAppImpl)createRMApp(7, conf, rmContext,
LogAggregationStatus.TIME_OUT);
rmContext.getRMApps().put(app1.getApplicationId(), app1);
rmContext.getRMApps().put(app2.getApplicationId(), app2);
rmContext.getRMApps().put(app3.getApplicationId(), app3);
rmContext.getRMApps().put(app4.getApplicationId(), app4);
rmContext.getRMApps().put(app5.getApplicationId(), app5);
rmContext.getRMApps().put(app6.getApplicationId(), app6);
rmContext.getRMApps().put(app7.getApplicationId(), app7);
RMAppImpl appImpl8 = (RMAppImpl)createRMApp(8, conf, rmContext,
LogAggregationStatus.SUCCEEDED);
rmContext.getRMApps().put(appImpl1.getApplicationId(), appImpl1);
rmContext.getRMApps().put(appImpl2.getApplicationId(), appImpl2);
rmContext.getRMApps().put(appImpl3.getApplicationId(), appImpl3);
rmContext.getRMApps().put(appImpl4.getApplicationId(), appImpl4);
rmContext.getRMApps().put(appImpl5.getApplicationId(), appImpl5);
rmContext.getRMApps().put(appImpl6.getApplicationId(), appImpl6);
rmContext.getRMApps().put(appImpl7.getApplicationId(), appImpl7);
// appImpl8 is not in the RM
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
Assert.assertEquals(0, hal.eligibleApplications.size());
hal.findAggregatedApps();
Assert.assertEquals(2, hal.eligibleApplications.size());
hal.eligibleApplications.add(
new HadoopArchiveLogs.AppInfo(appImpl1.getApplicationId().toString(),
USER));
hal.eligibleApplications.add(
new HadoopArchiveLogs.AppInfo(appImpl2.getApplicationId().toString(),
USER));
hal.eligibleApplications.add(
new HadoopArchiveLogs.AppInfo(appImpl3.getApplicationId().toString(),
USER));
HadoopArchiveLogs.AppInfo app4 =
new HadoopArchiveLogs.AppInfo(appImpl4.getApplicationId().toString(),
USER);
hal.eligibleApplications.add(app4);
hal.eligibleApplications.add(
new HadoopArchiveLogs.AppInfo(appImpl5.getApplicationId().toString(),
USER));
hal.eligibleApplications.add(
new HadoopArchiveLogs.AppInfo(appImpl6.getApplicationId().toString(),
USER));
HadoopArchiveLogs.AppInfo app7 =
new HadoopArchiveLogs.AppInfo(appImpl7.getApplicationId().toString(),
USER);
hal.eligibleApplications.add(app7);
HadoopArchiveLogs.AppInfo app8 =
new HadoopArchiveLogs.AppInfo(appImpl8.getApplicationId().toString(),
USER);
hal.eligibleApplications.add(app8);
Assert.assertEquals(8, hal.eligibleApplications.size());
hal.filterAppsByAggregatedStatus();
Assert.assertEquals(3, hal.eligibleApplications.size());
Assert.assertTrue(hal.eligibleApplications.contains(app4));
Assert.assertTrue(hal.eligibleApplications.contains(app7));
Assert.assertTrue(hal.eligibleApplications.contains(app8));
} finally {
if (yarnCluster != null) {
yarnCluster.stop();
@ -193,10 +246,12 @@ public class TestHadoopArchiveLogs {
public void testGenerateScript() throws Exception {
Configuration conf = new Configuration();
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
ApplicationReport app1 = createAppReport(1);
ApplicationReport app2 = createAppReport(2);
hal.eligibleApplications.add(app1);
hal.eligibleApplications.add(app2);
ApplicationId app1 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1);
ApplicationId app2 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2);
hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app1.toString(),
USER));
hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app2.toString(),
USER));
File localScript = new File("target", "script.sh");
Path workingDir = new Path("/tmp", "working");
@ -213,22 +268,16 @@ public class TestHadoopArchiveLogs {
Assert.assertEquals("set -e", lines[1]);
Assert.assertEquals("set -x", lines[2]);
Assert.assertEquals("if [ \"$YARN_SHELL_ID\" == \"1\" ]; then", lines[3]);
if (lines[4].contains(app1.getApplicationId().toString())) {
Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString()
+ "\"", lines[4]);
Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString()
+ "\"", lines[7]);
if (lines[4].contains(app1.toString())) {
Assert.assertEquals("\tappId=\"" + app1.toString() + "\"", lines[4]);
Assert.assertEquals("\tappId=\"" + app2.toString() + "\"", lines[7]);
} else {
Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString()
+ "\"", lines[4]);
Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString()
+ "\"", lines[7]);
Assert.assertEquals("\tappId=\"" + app2.toString() + "\"", lines[4]);
Assert.assertEquals("\tappId=\"" + app1.toString() + "\"", lines[7]);
}
Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"",
lines[5]);
Assert.assertEquals("\tuser=\"" + USER + "\"", lines[5]);
Assert.assertEquals("elif [ \"$YARN_SHELL_ID\" == \"2\" ]; then", lines[6]);
Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"",
lines[8]);
Assert.assertEquals("\tuser=\"" + USER + "\"", lines[8]);
Assert.assertEquals("else", lines[9]);
Assert.assertEquals("\techo \"Unknown Mapping!\"", lines[10]);
Assert.assertEquals("\texit 1", lines[11]);
@ -241,15 +290,23 @@ public class TestHadoopArchiveLogs {
remoteRootLogDir.toString() + " -suffix " + suffix, lines[15]);
}
private static ApplicationReport createAppReport(int id) {
ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id);
return ApplicationReport.newInstance(
appId,
ApplicationAttemptId.newInstance(appId, 1),
System.getProperty("user.name"),
null, null, null, 0, null, YarnApplicationState.FINISHED, null,
null, 0L, 0L, FinalApplicationStatus.SUCCEEDED, null, null, 100f,
null, null);
/**
* If this test failes, then a new Log Aggregation Status was added. Make
* sure that {@link HadoopArchiveLogs#filterAppsByAggregatedStatus()} and this test
* are updated as well, if necessary.
* @throws Exception
*/
@Test(timeout = 5000)
public void testStatuses() throws Exception {
LogAggregationStatus[] statuses = new LogAggregationStatus[7];
statuses[0] = LogAggregationStatus.DISABLED;
statuses[1] = LogAggregationStatus.NOT_START;
statuses[2] = LogAggregationStatus.RUNNING;
statuses[3] = LogAggregationStatus.RUNNING_WITH_FAILURE;
statuses[4] = LogAggregationStatus.SUCCEEDED;
statuses[5] = LogAggregationStatus.FAILED;
statuses[6] = LogAggregationStatus.TIME_OUT;
Assert.assertArrayEquals(statuses, LogAggregationStatus.values());
}
private static void createFile(FileSystem fs, Path p, long sizeMultiple)
@ -265,6 +322,7 @@ public class TestHadoopArchiveLogs {
out.close();
}
}
Assert.assertTrue(fs.exists(p));
}
private static RMApp createRMApp(int id, Configuration conf, RMContext rmContext,
@ -272,11 +330,10 @@ public class TestHadoopArchiveLogs {
ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id);
ApplicationSubmissionContext submissionContext =
ApplicationSubmissionContext.newInstance(appId, "test", "default",
Priority.newInstance(0), null, false, true,
Priority.newInstance(0), null, true, true,
2, Resource.newInstance(10, 2), "test");
return new RMAppImpl(appId, rmContext, conf, "test",
System.getProperty("user.name"), "default", submissionContext,
rmContext.getScheduler(),
USER, "default", submissionContext, rmContext.getScheduler(),
rmContext.getApplicationMasterService(),
System.currentTimeMillis(), "test",
null, null) {