diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index ca21c6c7292..9e719a150ea 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -311,6 +311,8 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6484. Yarn Client uses local address instead of RM address as
token renewer in a secure cluster when RM HA is enabled. (Zhihai Xu)
+ MAPREDUCE-6480. archive-logs tool may miss applications (rkanter)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
diff --git a/hadoop-tools/hadoop-archive-logs/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-archive-logs/dev-support/findbugs-exclude.xml
new file mode 100644
index 00000000000..7f2064ea24d
--- /dev/null
+++ b/hadoop-tools/hadoop-archive-logs/dev-support/findbugs-exclude.xml
@@ -0,0 +1,32 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/hadoop-tools/hadoop-archive-logs/pom.xml b/hadoop-tools/hadoop-archive-logs/pom.xml
index 4b53b7857fa..6ded278a39a 100644
--- a/hadoop-tools/hadoop-archive-logs/pom.xml
+++ b/hadoop-tools/hadoop-archive-logs/pom.xml
@@ -118,6 +118,12 @@
test
+
+ org.mockito
+ mockito-all
+ test
+
+
org.apache.hadoop
hadoop-common
@@ -166,6 +172,18 @@
+
+ org.codehaus.mojo
+ findbugs-maven-plugin
+
+ true
+ true
+
+ ${basedir}/dev-support/findbugs-exclude.xml
+
+ Max
+
+
diff --git a/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java
index 4778dcbdce0..0879d41f7fc 100644
--- a/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java
+++ b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java
@@ -26,12 +26,14 @@
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.output.FileWriterWithEncoding;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.JobConf;
@@ -43,13 +45,15 @@
import org.apache.hadoop.yarn.applications.distributedshell.Client;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import java.io.File;
-import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@@ -71,6 +75,7 @@ public class HadoopArchiveLogs implements Tool {
private static final String MIN_NUM_LOG_FILES_OPTION = "minNumberLogFiles";
private static final String MAX_TOTAL_LOGS_SIZE_OPTION = "maxTotalLogsSize";
private static final String MEMORY_OPTION = "memory";
+ private static final String VERBOSE_OPTION = "verbose";
private static final int DEFAULT_MAX_ELIGIBLE = -1;
private static final int DEFAULT_MIN_NUM_LOG_FILES = 20;
@@ -85,9 +90,10 @@ public class HadoopArchiveLogs implements Tool {
long maxTotalLogsSize = DEFAULT_MAX_TOTAL_LOGS_SIZE * 1024L * 1024L;
@VisibleForTesting
long memory = DEFAULT_MEMORY;
+ private boolean verbose = false;
@VisibleForTesting
- Set eligibleApplications;
+ Set eligibleApplications;
private JobConf conf;
@@ -122,17 +128,20 @@ public static void main(String[] args) {
public int run(String[] args) throws Exception {
handleOpts(args);
- findAggregatedApps();
-
FileSystem fs = null;
Path remoteRootLogDir = new Path(conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
Path workingDir = new Path(remoteRootLogDir, "archive-logs-work");
+ if (verbose) {
+ LOG.info("Remote Log Dir Root: " + remoteRootLogDir);
+ LOG.info("Log Suffix: " + suffix);
+ LOG.info("Working Dir: " + workingDir);
+ }
try {
fs = FileSystem.get(conf);
- checkFiles(fs, remoteRootLogDir, suffix);
+ checkFilesAndSeedApps(fs, remoteRootLogDir, suffix);
// Prepare working directory
if (fs.exists(workingDir)) {
@@ -147,6 +156,8 @@ public int run(String[] args) throws Exception {
}
}
+ filterAppsByAggregatedStatus();
+
checkMaxEligible();
if (eligibleApplications.isEmpty()) {
@@ -156,8 +167,8 @@ public int run(String[] args) throws Exception {
StringBuilder sb =
new StringBuilder("Will process the following applications:");
- for (ApplicationReport report : eligibleApplications) {
- sb.append("\n\t").append(report.getApplicationId());
+ for (AppInfo app : eligibleApplications) {
+ sb.append("\n\t").append(app.getAppId());
}
LOG.info(sb.toString());
@@ -189,11 +200,14 @@ private void handleOpts(String[] args) throws ParseException {
"The amount of memory (in megabytes) for each container (default: "
+ DEFAULT_MEMORY + ")");
memoryOpt.setArgName("megabytes");
+ Option verboseOpt = new Option(VERBOSE_OPTION, false,
+ "Print more details.");
opts.addOption(helpOpt);
opts.addOption(maxEligibleOpt);
opts.addOption(minNumLogFilesOpt);
opts.addOption(maxTotalLogsSizeOpt);
opts.addOption(memoryOpt);
+ opts.addOption(verboseOpt);
try {
CommandLineParser parser = new GnuParser();
@@ -225,6 +239,9 @@ private void handleOpts(String[] args) throws ParseException {
if (commandLine.hasOption(MEMORY_OPTION)) {
memory = Long.parseLong(commandLine.getOptionValue(MEMORY_OPTION));
}
+ if (commandLine.hasOption(VERBOSE_OPTION)) {
+ verbose = true;
+ }
} catch (ParseException pe) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("yarn archive-logs", opts);
@@ -233,17 +250,39 @@ private void handleOpts(String[] args) throws ParseException {
}
@VisibleForTesting
- void findAggregatedApps() throws IOException, YarnException {
+ void filterAppsByAggregatedStatus() throws IOException, YarnException {
YarnClient client = YarnClient.createYarnClient();
try {
client.init(getConf());
client.start();
- List reports = client.getApplications();
- for (ApplicationReport report : reports) {
- LogAggregationStatus aggStatus = report.getLogAggregationStatus();
- if (aggStatus.equals(LogAggregationStatus.SUCCEEDED) ||
- aggStatus.equals(LogAggregationStatus.FAILED)) {
- eligibleApplications.add(report);
+ for (Iterator it = eligibleApplications.iterator();
+ it.hasNext();) {
+ AppInfo app = it.next();
+ try {
+ ApplicationReport report = client.getApplicationReport(
+ ConverterUtils.toApplicationId(app.getAppId()));
+ LogAggregationStatus aggStatus = report.getLogAggregationStatus();
+ if (aggStatus.equals(LogAggregationStatus.RUNNING) ||
+ aggStatus.equals(LogAggregationStatus.RUNNING_WITH_FAILURE) ||
+ aggStatus.equals(LogAggregationStatus.NOT_START) ||
+ aggStatus.equals(LogAggregationStatus.DISABLED) ||
+ aggStatus.equals(LogAggregationStatus.FAILED)) {
+ if (verbose) {
+ LOG.info("Skipping " + app.getAppId() +
+ " due to aggregation status being " + aggStatus);
+ }
+ it.remove();
+ } else {
+ if (verbose) {
+ LOG.info(app.getAppId() + " has aggregation status " + aggStatus);
+ }
+ app.setFinishTime(report.getFinishTime());
+ }
+ } catch (ApplicationNotFoundException e) {
+ // Assume the aggregation has finished
+ if (verbose) {
+ LOG.info(app.getAppId() + " not in the ResourceManager");
+ }
}
}
} finally {
@@ -254,33 +293,71 @@ void findAggregatedApps() throws IOException, YarnException {
}
@VisibleForTesting
- void checkFiles(FileSystem fs, Path remoteRootLogDir, String suffix) {
- for (Iterator reportIt = eligibleApplications.iterator();
- reportIt.hasNext(); ) {
- ApplicationReport report = reportIt.next();
- long totalFileSize = 0L;
+ void checkFilesAndSeedApps(FileSystem fs, Path remoteRootLogDir,
+ String suffix) throws IOException {
+ for (RemoteIterator userIt =
+ fs.listStatusIterator(remoteRootLogDir); userIt.hasNext();) {
+ Path userLogPath = userIt.next().getPath();
try {
- FileStatus[] files = fs.listStatus(
- LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir,
- report.getApplicationId(), report.getUser(), suffix));
- if (files.length < minNumLogFiles) {
- reportIt.remove();
- } else {
- for (FileStatus file : files) {
- if (file.getPath().getName().equals(report.getApplicationId()
- + ".har")) {
- reportIt.remove();
- break;
+ for (RemoteIterator appIt =
+ fs.listStatusIterator(new Path(userLogPath, suffix));
+ appIt.hasNext();) {
+ Path appLogPath = appIt.next().getPath();
+ try {
+ FileStatus[] files = fs.listStatus(appLogPath);
+ if (files.length >= minNumLogFiles) {
+ boolean eligible = true;
+ long totalFileSize = 0L;
+ for (FileStatus file : files) {
+ if (file.getPath().getName().equals(appLogPath.getName()
+ + ".har")) {
+ eligible = false;
+ if (verbose) {
+ LOG.info("Skipping " + appLogPath.getName() +
+ " due to existing .har file");
+ }
+ break;
+ }
+ totalFileSize += file.getLen();
+ if (totalFileSize > maxTotalLogsSize) {
+ eligible = false;
+ if (verbose) {
+ LOG.info("Skipping " + appLogPath.getName() + " due to " +
+ "total file size being too large (" + totalFileSize +
+ " > " + maxTotalLogsSize + ")");
+ }
+ break;
+ }
+ }
+ if (eligible) {
+ if (verbose) {
+ LOG.info("Adding " + appLogPath.getName() + " for user " +
+ userLogPath.getName());
+ }
+ eligibleApplications.add(
+ new AppInfo(appLogPath.getName(), userLogPath.getName()));
+ }
+ } else {
+ if (verbose) {
+ LOG.info("Skipping " + appLogPath.getName() + " due to not " +
+ "having enough log files (" + files.length + " < " +
+ minNumLogFiles + ")");
+ }
+ }
+ } catch (IOException ioe) {
+ // Ignore any apps we can't read
+ if (verbose) {
+ LOG.info("Skipping logs under " + appLogPath + " due to " +
+ ioe.getMessage());
}
- totalFileSize += file.getLen();
- }
- if (totalFileSize > maxTotalLogsSize) {
- reportIt.remove();
}
}
} catch (IOException ioe) {
- // If the user doesn't have permission or it doesn't exist, then skip it
- reportIt.remove();
+ // Ignore any apps we can't read
+ if (verbose) {
+ LOG.info("Skipping all logs under " + userLogPath + " due to " +
+ ioe.getMessage());
+ }
}
}
}
@@ -289,15 +366,26 @@ void checkFiles(FileSystem fs, Path remoteRootLogDir, String suffix) {
void checkMaxEligible() {
// If we have too many eligible apps, remove the newest ones first
if (maxEligible > 0 && eligibleApplications.size() > maxEligible) {
- List sortedApplications =
- new ArrayList(eligibleApplications);
- Collections.sort(sortedApplications, new Comparator() {
+ if (verbose) {
+ LOG.info("Too many applications (" + eligibleApplications.size() +
+ " > " + maxEligible + ")");
+ }
+ List sortedApplications =
+ new ArrayList(eligibleApplications);
+ Collections.sort(sortedApplications, new Comparator() {
@Override
- public int compare(ApplicationReport o1, ApplicationReport o2) {
- return Long.compare(o1.getFinishTime(), o2.getFinishTime());
+ public int compare(AppInfo o1, AppInfo o2) {
+ int lCompare = Long.compare(o1.getFinishTime(), o2.getFinishTime());
+ if (lCompare == 0) {
+ return o1.getAppId().compareTo(o2.getAppId());
+ }
+ return lCompare;
}
});
for (int i = maxEligible; i < sortedApplications.size(); i++) {
+ if (verbose) {
+ LOG.info("Removing " + sortedApplications.get(i));
+ }
eligibleApplications.remove(sortedApplications.get(i));
}
}
@@ -325,24 +413,26 @@ public int compare(ApplicationReport o1, ApplicationReport o2) {
@VisibleForTesting
void generateScript(File localScript, Path workingDir,
Path remoteRootLogDir, String suffix) throws IOException {
- LOG.info("Generating script at: " + localScript.getAbsolutePath());
+ if (verbose) {
+ LOG.info("Generating script at: " + localScript.getAbsolutePath());
+ }
String halrJarPath = HadoopArchiveLogsRunner.class.getProtectionDomain()
.getCodeSource().getLocation().getPath();
String harJarPath = HadoopArchives.class.getProtectionDomain()
.getCodeSource().getLocation().getPath();
String classpath = halrJarPath + File.pathSeparator + harJarPath;
- FileWriter fw = null;
+ FileWriterWithEncoding fw = null;
try {
- fw = new FileWriter(localScript);
+ fw = new FileWriterWithEncoding(localScript, "UTF-8");
fw.write("#!/bin/bash\nset -e\nset -x\n");
int containerCount = 1;
- for (ApplicationReport report : eligibleApplications) {
+ for (AppInfo app : eligibleApplications) {
fw.write("if [ \"$YARN_SHELL_ID\" == \"");
fw.write(Integer.toString(containerCount));
fw.write("\" ]; then\n\tappId=\"");
- fw.write(report.getApplicationId().toString());
+ fw.write(app.getAppId());
fw.write("\"\n\tuser=\"");
- fw.write(report.getUser());
+ fw.write(app.getUser());
fw.write("\"\nel");
containerCount++;
}
@@ -382,6 +472,10 @@ private boolean runDistributedShell(File localScript) throws Exception {
"--shell_script",
localScript.getAbsolutePath()
};
+ if (verbose) {
+ LOG.info("Running Distributed Shell with arguments: " +
+ Arrays.toString(dsArgs));
+ }
final Client dsClient = new Client(new Configuration(conf));
dsClient.init(dsArgs);
return dsClient.run();
@@ -400,4 +494,59 @@ public void setConf(Configuration conf) {
public Configuration getConf() {
return this.conf;
}
+
+ @VisibleForTesting
+ static class AppInfo {
+ private String appId;
+ private String user;
+ private long finishTime;
+
+ AppInfo(String appId, String user) {
+ this.appId = appId;
+ this.user = user;
+ this.finishTime = 0L;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ AppInfo appInfo = (AppInfo) o;
+
+ if (appId != null
+ ? !appId.equals(appInfo.appId) : appInfo.appId != null) {
+ return false;
+ }
+ return !(user != null
+ ? !user.equals(appInfo.user) : appInfo.user != null);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = appId != null ? appId.hashCode() : 0;
+ result = 31 * result + (user != null ? user.hashCode() : 0);
+ return result;
+ }
+ }
}
diff --git a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
index c8ff201946d..7423f7926bd 100644
--- a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
+++ b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
@@ -23,15 +23,12 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -47,6 +44,7 @@
public class TestHadoopArchiveLogs {
private static final long CLUSTER_TIMESTAMP = System.currentTimeMillis();
+ private static final String USER = System.getProperty("user.name");
private static final int FILE_SIZE_INCREMENT = 4096;
private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT];
static {
@@ -54,96 +52,117 @@ public class TestHadoopArchiveLogs {
}
@Test(timeout = 10000)
- public void testCheckFiles() throws Exception {
+ public void testCheckFilesAndSeedApps() throws Exception {
Configuration conf = new Configuration();
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
FileSystem fs = FileSystem.getLocal(conf);
Path rootLogDir = new Path("target", "logs");
String suffix = "logs";
- Path logDir = new Path(rootLogDir,
- new Path(System.getProperty("user.name"), suffix));
+ Path logDir = new Path(rootLogDir, new Path(USER, suffix));
fs.mkdirs(logDir);
- Assert.assertEquals(0, hal.eligibleApplications.size());
- ApplicationReport app1 = createAppReport(1); // no files found
- ApplicationReport app2 = createAppReport(2); // too few files
- Path app2Path = new Path(logDir, app2.getApplicationId().toString());
+ // no files found
+ ApplicationId appId1 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1);
+ Path app1Path = new Path(logDir, appId1.toString());
+ fs.mkdirs(app1Path);
+ // too few files
+ ApplicationId appId2 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2);
+ Path app2Path = new Path(logDir, appId2.toString());
fs.mkdirs(app2Path);
createFile(fs, new Path(app2Path, "file1"), 1);
hal.minNumLogFiles = 2;
- ApplicationReport app3 = createAppReport(3); // too large
- Path app3Path = new Path(logDir, app3.getApplicationId().toString());
+ // too large
+ ApplicationId appId3 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 3);
+ Path app3Path = new Path(logDir, appId3.toString());
fs.mkdirs(app3Path);
createFile(fs, new Path(app3Path, "file1"), 2);
createFile(fs, new Path(app3Path, "file2"), 5);
hal.maxTotalLogsSize = FILE_SIZE_INCREMENT * 6;
- ApplicationReport app4 = createAppReport(4); // has har already
- Path app4Path = new Path(logDir, app4.getApplicationId().toString());
+ // has har already
+ ApplicationId appId4 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 4);
+ Path app4Path = new Path(logDir, appId4.toString());
fs.mkdirs(app4Path);
- createFile(fs, new Path(app4Path, app4.getApplicationId() + ".har"), 1);
- ApplicationReport app5 = createAppReport(5); // just right
- Path app5Path = new Path(logDir, app5.getApplicationId().toString());
+ createFile(fs, new Path(app4Path, appId4 + ".har"), 1);
+ // just right
+ ApplicationId appId5 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 5);
+ Path app5Path = new Path(logDir, appId5.toString());
fs.mkdirs(app5Path);
createFile(fs, new Path(app5Path, "file1"), 2);
createFile(fs, new Path(app5Path, "file2"), 3);
- hal.eligibleApplications.add(app1);
- hal.eligibleApplications.add(app2);
- hal.eligibleApplications.add(app3);
- hal.eligibleApplications.add(app4);
- hal.eligibleApplications.add(app5);
- hal.checkFiles(fs, rootLogDir, suffix);
+ Assert.assertEquals(0, hal.eligibleApplications.size());
+ hal.checkFilesAndSeedApps(fs, rootLogDir, suffix);
Assert.assertEquals(1, hal.eligibleApplications.size());
- Assert.assertEquals(app5, hal.eligibleApplications.iterator().next());
+ Assert.assertEquals(appId5.toString(),
+ hal.eligibleApplications.iterator().next().getAppId());
}
@Test(timeout = 10000)
public void testCheckMaxEligible() throws Exception {
Configuration conf = new Configuration();
- HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
- ApplicationReport app1 = createAppReport(1);
+ HadoopArchiveLogs.AppInfo app1 = new HadoopArchiveLogs.AppInfo(
+ ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1).toString(), USER);
app1.setFinishTime(CLUSTER_TIMESTAMP - 5);
- ApplicationReport app2 = createAppReport(2);
+ HadoopArchiveLogs.AppInfo app2 = new HadoopArchiveLogs.AppInfo(
+ ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2).toString(), USER);
app2.setFinishTime(CLUSTER_TIMESTAMP - 10);
- ApplicationReport app3 = createAppReport(3);
- app3.setFinishTime(CLUSTER_TIMESTAMP + 5);
- ApplicationReport app4 = createAppReport(4);
- app4.setFinishTime(CLUSTER_TIMESTAMP + 10);
- ApplicationReport app5 = createAppReport(5);
- app5.setFinishTime(CLUSTER_TIMESTAMP);
+ HadoopArchiveLogs.AppInfo app3 = new HadoopArchiveLogs.AppInfo(
+ ApplicationId.newInstance(CLUSTER_TIMESTAMP, 3).toString(), USER);
+ // app3 has no finish time set
+ HadoopArchiveLogs.AppInfo app4 = new HadoopArchiveLogs.AppInfo(
+ ApplicationId.newInstance(CLUSTER_TIMESTAMP, 4).toString(), USER);
+ app4.setFinishTime(CLUSTER_TIMESTAMP + 5);
+ HadoopArchiveLogs.AppInfo app5 = new HadoopArchiveLogs.AppInfo(
+ ApplicationId.newInstance(CLUSTER_TIMESTAMP, 5).toString(), USER);
+ app5.setFinishTime(CLUSTER_TIMESTAMP + 10);
+ HadoopArchiveLogs.AppInfo app6 = new HadoopArchiveLogs.AppInfo(
+ ApplicationId.newInstance(CLUSTER_TIMESTAMP, 6).toString(), USER);
+ // app6 has no finish time set
+ HadoopArchiveLogs.AppInfo app7 = new HadoopArchiveLogs.AppInfo(
+ ApplicationId.newInstance(CLUSTER_TIMESTAMP, 7).toString(), USER);
+ app7.setFinishTime(CLUSTER_TIMESTAMP);
+ HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
Assert.assertEquals(0, hal.eligibleApplications.size());
hal.eligibleApplications.add(app1);
hal.eligibleApplications.add(app2);
hal.eligibleApplications.add(app3);
hal.eligibleApplications.add(app4);
hal.eligibleApplications.add(app5);
+ hal.eligibleApplications.add(app6);
+ hal.eligibleApplications.add(app7);
+ Assert.assertEquals(7, hal.eligibleApplications.size());
hal.maxEligible = -1;
hal.checkMaxEligible();
+ Assert.assertEquals(7, hal.eligibleApplications.size());
+ hal.maxEligible = 6;
+ hal.checkMaxEligible();
+ Assert.assertEquals(6, hal.eligibleApplications.size());
+ Assert.assertFalse(hal.eligibleApplications.contains(app5));
+ hal.maxEligible = 5;
+ hal.checkMaxEligible();
Assert.assertEquals(5, hal.eligibleApplications.size());
-
+ Assert.assertFalse(hal.eligibleApplications.contains(app4));
hal.maxEligible = 4;
hal.checkMaxEligible();
Assert.assertEquals(4, hal.eligibleApplications.size());
- Assert.assertFalse(hal.eligibleApplications.contains(app4));
-
+ Assert.assertFalse(hal.eligibleApplications.contains(app7));
hal.maxEligible = 3;
hal.checkMaxEligible();
Assert.assertEquals(3, hal.eligibleApplications.size());
- Assert.assertFalse(hal.eligibleApplications.contains(app3));
-
+ Assert.assertFalse(hal.eligibleApplications.contains(app1));
hal.maxEligible = 2;
hal.checkMaxEligible();
Assert.assertEquals(2, hal.eligibleApplications.size());
- Assert.assertFalse(hal.eligibleApplications.contains(app5));
-
+ Assert.assertFalse(hal.eligibleApplications.contains(app2));
hal.maxEligible = 1;
hal.checkMaxEligible();
Assert.assertEquals(1, hal.eligibleApplications.size());
- Assert.assertFalse(hal.eligibleApplications.contains(app1));
+ Assert.assertFalse(hal.eligibleApplications.contains(app6));
+ Assert.assertTrue(hal.eligibleApplications.contains(app3));
}
@Test(timeout = 10000)
- public void testFindAggregatedApps() throws Exception {
+ public void testFilterAppsByAggregatedStatus() throws Exception {
MiniYARNCluster yarnCluster = null;
try {
Configuration conf = new Configuration();
@@ -156,32 +175,66 @@ public void testFindAggregatedApps() throws Exception {
conf = yarnCluster.getConfig();
RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
- RMAppImpl app1 = (RMAppImpl)createRMApp(1, conf, rmContext,
+ RMAppImpl appImpl1 = (RMAppImpl)createRMApp(1, conf, rmContext,
LogAggregationStatus.DISABLED);
- RMAppImpl app2 = (RMAppImpl)createRMApp(2, conf, rmContext,
+ RMAppImpl appImpl2 = (RMAppImpl)createRMApp(2, conf, rmContext,
LogAggregationStatus.FAILED);
- RMAppImpl app3 = (RMAppImpl)createRMApp(3, conf, rmContext,
+ RMAppImpl appImpl3 = (RMAppImpl)createRMApp(3, conf, rmContext,
LogAggregationStatus.NOT_START);
- RMAppImpl app4 = (RMAppImpl)createRMApp(4, conf, rmContext,
+ RMAppImpl appImpl4 = (RMAppImpl)createRMApp(4, conf, rmContext,
LogAggregationStatus.SUCCEEDED);
- RMAppImpl app5 = (RMAppImpl)createRMApp(5, conf, rmContext,
+ RMAppImpl appImpl5 = (RMAppImpl)createRMApp(5, conf, rmContext,
LogAggregationStatus.RUNNING);
- RMAppImpl app6 = (RMAppImpl)createRMApp(6, conf, rmContext,
+ RMAppImpl appImpl6 = (RMAppImpl)createRMApp(6, conf, rmContext,
LogAggregationStatus.RUNNING_WITH_FAILURE);
- RMAppImpl app7 = (RMAppImpl)createRMApp(7, conf, rmContext,
+ RMAppImpl appImpl7 = (RMAppImpl)createRMApp(7, conf, rmContext,
LogAggregationStatus.TIME_OUT);
- rmContext.getRMApps().put(app1.getApplicationId(), app1);
- rmContext.getRMApps().put(app2.getApplicationId(), app2);
- rmContext.getRMApps().put(app3.getApplicationId(), app3);
- rmContext.getRMApps().put(app4.getApplicationId(), app4);
- rmContext.getRMApps().put(app5.getApplicationId(), app5);
- rmContext.getRMApps().put(app6.getApplicationId(), app6);
- rmContext.getRMApps().put(app7.getApplicationId(), app7);
+ RMAppImpl appImpl8 = (RMAppImpl)createRMApp(8, conf, rmContext,
+ LogAggregationStatus.SUCCEEDED);
+ rmContext.getRMApps().put(appImpl1.getApplicationId(), appImpl1);
+ rmContext.getRMApps().put(appImpl2.getApplicationId(), appImpl2);
+ rmContext.getRMApps().put(appImpl3.getApplicationId(), appImpl3);
+ rmContext.getRMApps().put(appImpl4.getApplicationId(), appImpl4);
+ rmContext.getRMApps().put(appImpl5.getApplicationId(), appImpl5);
+ rmContext.getRMApps().put(appImpl6.getApplicationId(), appImpl6);
+ rmContext.getRMApps().put(appImpl7.getApplicationId(), appImpl7);
+ // appImpl8 is not in the RM
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
Assert.assertEquals(0, hal.eligibleApplications.size());
- hal.findAggregatedApps();
- Assert.assertEquals(2, hal.eligibleApplications.size());
+ hal.eligibleApplications.add(
+ new HadoopArchiveLogs.AppInfo(appImpl1.getApplicationId().toString(),
+ USER));
+ hal.eligibleApplications.add(
+ new HadoopArchiveLogs.AppInfo(appImpl2.getApplicationId().toString(),
+ USER));
+ hal.eligibleApplications.add(
+ new HadoopArchiveLogs.AppInfo(appImpl3.getApplicationId().toString(),
+ USER));
+ HadoopArchiveLogs.AppInfo app4 =
+ new HadoopArchiveLogs.AppInfo(appImpl4.getApplicationId().toString(),
+ USER);
+ hal.eligibleApplications.add(app4);
+ hal.eligibleApplications.add(
+ new HadoopArchiveLogs.AppInfo(appImpl5.getApplicationId().toString(),
+ USER));
+ hal.eligibleApplications.add(
+ new HadoopArchiveLogs.AppInfo(appImpl6.getApplicationId().toString(),
+ USER));
+ HadoopArchiveLogs.AppInfo app7 =
+ new HadoopArchiveLogs.AppInfo(appImpl7.getApplicationId().toString(),
+ USER);
+ hal.eligibleApplications.add(app7);
+ HadoopArchiveLogs.AppInfo app8 =
+ new HadoopArchiveLogs.AppInfo(appImpl8.getApplicationId().toString(),
+ USER);
+ hal.eligibleApplications.add(app8);
+ Assert.assertEquals(8, hal.eligibleApplications.size());
+ hal.filterAppsByAggregatedStatus();
+ Assert.assertEquals(3, hal.eligibleApplications.size());
+ Assert.assertTrue(hal.eligibleApplications.contains(app4));
+ Assert.assertTrue(hal.eligibleApplications.contains(app7));
+ Assert.assertTrue(hal.eligibleApplications.contains(app8));
} finally {
if (yarnCluster != null) {
yarnCluster.stop();
@@ -193,10 +246,12 @@ public void testFindAggregatedApps() throws Exception {
public void testGenerateScript() throws Exception {
Configuration conf = new Configuration();
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
- ApplicationReport app1 = createAppReport(1);
- ApplicationReport app2 = createAppReport(2);
- hal.eligibleApplications.add(app1);
- hal.eligibleApplications.add(app2);
+ ApplicationId app1 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1);
+ ApplicationId app2 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2);
+ hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app1.toString(),
+ USER));
+ hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app2.toString(),
+ USER));
File localScript = new File("target", "script.sh");
Path workingDir = new Path("/tmp", "working");
@@ -213,22 +268,16 @@ public void testGenerateScript() throws Exception {
Assert.assertEquals("set -e", lines[1]);
Assert.assertEquals("set -x", lines[2]);
Assert.assertEquals("if [ \"$YARN_SHELL_ID\" == \"1\" ]; then", lines[3]);
- if (lines[4].contains(app1.getApplicationId().toString())) {
- Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString()
- + "\"", lines[4]);
- Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString()
- + "\"", lines[7]);
+ if (lines[4].contains(app1.toString())) {
+ Assert.assertEquals("\tappId=\"" + app1.toString() + "\"", lines[4]);
+ Assert.assertEquals("\tappId=\"" + app2.toString() + "\"", lines[7]);
} else {
- Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString()
- + "\"", lines[4]);
- Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString()
- + "\"", lines[7]);
+ Assert.assertEquals("\tappId=\"" + app2.toString() + "\"", lines[4]);
+ Assert.assertEquals("\tappId=\"" + app1.toString() + "\"", lines[7]);
}
- Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"",
- lines[5]);
+ Assert.assertEquals("\tuser=\"" + USER + "\"", lines[5]);
Assert.assertEquals("elif [ \"$YARN_SHELL_ID\" == \"2\" ]; then", lines[6]);
- Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"",
- lines[8]);
+ Assert.assertEquals("\tuser=\"" + USER + "\"", lines[8]);
Assert.assertEquals("else", lines[9]);
Assert.assertEquals("\techo \"Unknown Mapping!\"", lines[10]);
Assert.assertEquals("\texit 1", lines[11]);
@@ -241,15 +290,23 @@ public void testGenerateScript() throws Exception {
remoteRootLogDir.toString() + " -suffix " + suffix, lines[15]);
}
- private static ApplicationReport createAppReport(int id) {
- ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id);
- return ApplicationReport.newInstance(
- appId,
- ApplicationAttemptId.newInstance(appId, 1),
- System.getProperty("user.name"),
- null, null, null, 0, null, YarnApplicationState.FINISHED, null,
- null, 0L, 0L, FinalApplicationStatus.SUCCEEDED, null, null, 100f,
- null, null);
+ /**
+ * If this test failes, then a new Log Aggregation Status was added. Make
+ * sure that {@link HadoopArchiveLogs#filterAppsByAggregatedStatus()} and this test
+ * are updated as well, if necessary.
+ * @throws Exception
+ */
+ @Test(timeout = 5000)
+ public void testStatuses() throws Exception {
+ LogAggregationStatus[] statuses = new LogAggregationStatus[7];
+ statuses[0] = LogAggregationStatus.DISABLED;
+ statuses[1] = LogAggregationStatus.NOT_START;
+ statuses[2] = LogAggregationStatus.RUNNING;
+ statuses[3] = LogAggregationStatus.RUNNING_WITH_FAILURE;
+ statuses[4] = LogAggregationStatus.SUCCEEDED;
+ statuses[5] = LogAggregationStatus.FAILED;
+ statuses[6] = LogAggregationStatus.TIME_OUT;
+ Assert.assertArrayEquals(statuses, LogAggregationStatus.values());
}
private static void createFile(FileSystem fs, Path p, long sizeMultiple)
@@ -265,6 +322,7 @@ private static void createFile(FileSystem fs, Path p, long sizeMultiple)
out.close();
}
}
+ Assert.assertTrue(fs.exists(p));
}
private static RMApp createRMApp(int id, Configuration conf, RMContext rmContext,
@@ -272,11 +330,10 @@ private static RMApp createRMApp(int id, Configuration conf, RMContext rmContext
ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id);
ApplicationSubmissionContext submissionContext =
ApplicationSubmissionContext.newInstance(appId, "test", "default",
- Priority.newInstance(0), null, false, true,
+ Priority.newInstance(0), null, true, true,
2, Resource.newInstance(10, 2), "test");
return new RMAppImpl(appId, rmContext, conf, "test",
- System.getProperty("user.name"), "default", submissionContext,
- rmContext.getScheduler(),
+ USER, "default", submissionContext, rmContext.getScheduler(),
rmContext.getApplicationMasterService(),
System.currentTimeMillis(), "test",
null, null) {