diff --git a/MAPREDUCE-6415_branch-2.003.patch b/MAPREDUCE-6415_branch-2.003.patch
new file mode 100644
index 00000000000..e10be18eccf
--- /dev/null
+++ b/MAPREDUCE-6415_branch-2.003.patch
@@ -0,0 +1,1305 @@
+diff --git hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
+index fa55703..3f646e6 100644
+--- hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
++++ hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
+@@ -52,6 +52,13 @@
+
+
+
++ ../hadoop-archive-logs/target
++ /share/hadoop/${hadoop.component}/sources
++
++ *-sources.jar
++
++
++
+ ../hadoop-datajoin/target
+ /share/hadoop/${hadoop.component}/sources
+
+diff --git hadoop-mapreduce-project/bin/mapred hadoop-mapreduce-project/bin/mapred
+index 277dc96..6f3bc04 100755
+--- hadoop-mapreduce-project/bin/mapred
++++ hadoop-mapreduce-project/bin/mapred
+@@ -39,6 +39,7 @@ function print_usage(){
+ echo " historyserver run job history servers as a standalone daemon"
+ echo " distcp copy file or directories recursively"
+ echo " archive -archiveName NAME -p * create a hadoop archive"
++ echo " archive-logs combine aggregated logs into hadoop archives"
+ echo " hsadmin job history server admin interface"
+ echo ""
+ echo "Most commands print help when invoked w/o parameters."
+@@ -96,6 +97,10 @@ elif [ "$COMMAND" = "archive" ] ; then
+ CLASS=org.apache.hadoop.tools.HadoopArchives
+ CLASSPATH=${CLASSPATH}:${TOOL_PATH}
+ HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
++elif [ "$COMMAND" = "archive-logs" ] ; then
++ CLASS=org.apache.hadoop.tools.HadoopArchiveLogs
++ CLASSPATH=${CLASSPATH}:${TOOL_PATH}
++ HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
+ elif [ "$COMMAND" = "hsadmin" ] ; then
+ CLASS=org.apache.hadoop.mapreduce.v2.hs.client.HSAdmin
+ HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
+diff --git hadoop-project/pom.xml hadoop-project/pom.xml
+index 1a01d72..f7b90ce 100644
+--- hadoop-project/pom.xml
++++ hadoop-project/pom.xml
+@@ -324,6 +324,11 @@
+
+
+ org.apache.hadoop
++ hadoop-archive-logs
++ ${project.version}
++
++
++ org.apache.hadoop
+ hadoop-distcp
+ ${project.version}
+
+diff --git hadoop-tools/hadoop-archive-logs/pom.xml hadoop-tools/hadoop-archive-logs/pom.xml
+new file mode 100644
+index 0000000..4b53b78
+--- /dev/null
++++ hadoop-tools/hadoop-archive-logs/pom.xml
+@@ -0,0 +1,171 @@
++
++
++
++ 4.0.0
++
++ org.apache.hadoop
++ hadoop-project
++ 2.8.0-SNAPSHOT
++ ../../hadoop-project
++
++ org.apache.hadoop
++ hadoop-archive-logs
++ 2.8.0-SNAPSHOT
++ Apache Hadoop Archive Logs
++ Apache Hadoop Archive Logs
++ jar
++
++
++ ${project.build.directory}/log
++
++
++
++
++ junit
++ junit
++ test
++
++
++ org.apache.hadoop
++ hadoop-mapreduce-client-core
++ provided
++
++
++ org.apache.hadoop
++ hadoop-yarn-applications-distributedshell
++ provided
++
++
++ org.apache.hadoop
++ hadoop-common
++ provided
++
++
++ org.apache.hadoop
++ hadoop-hdfs
++ test
++ test-jar
++
++
++ org.apache.hadoop
++ hadoop-yarn-server-tests
++ test-jar
++ test
++
++
++ org.apache.hadoop
++ hadoop-archives
++
++
++ org.apache.hadoop
++ hadoop-yarn-common
++ provided
++
++
++ org.apache.hadoop
++ hadoop-yarn-api
++ provided
++
++
++ com.google.guava
++ guava
++ provided
++
++
++ commons-io
++ commons-io
++ provided
++
++
++ commons-logging
++ commons-logging
++ provided
++
++
++ commons-cli
++ commons-cli
++ provided
++
++
++ org.apache.hadoop
++ hadoop-yarn-client
++ provided
++
++
++ org.apache.hadoop
++ hadoop-yarn-server-resourcemanager
++ provided
++
++
++
++ org.apache.hadoop
++ hadoop-hdfs
++ test
++
++
++
++ org.apache.hadoop
++ hadoop-common
++ test
++ test-jar
++
++
++
++ org.apache.hadoop
++ hadoop-mapreduce-client-jobclient
++ test
++ test-jar
++
++
++
++
++
++
++ org.apache.maven.plugins
++ maven-antrun-plugin
++
++
++ create-log-dir
++ process-test-resources
++
++ run
++
++
++
++
++
++
++
++
++
++
++
++
++ org.apache.maven.plugins
++ maven-jar-plugin
++
++
++
++ org.apache.hadoop.tools.HadoopArchiveLogs
++
++
++
++
++
++
++
+diff --git hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java
+new file mode 100644
+index 0000000..4778dcb
+--- /dev/null
++++ hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java
+@@ -0,0 +1,403 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.hadoop.tools;
++
++import com.google.common.annotations.VisibleForTesting;
++import org.apache.commons.cli.CommandLine;
++import org.apache.commons.cli.CommandLineParser;
++import org.apache.commons.cli.GnuParser;
++import org.apache.commons.cli.HelpFormatter;
++import org.apache.commons.cli.Option;
++import org.apache.commons.cli.Options;
++import org.apache.commons.cli.ParseException;
++import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.fs.permission.FsAction;
++import org.apache.hadoop.fs.permission.FsPermission;
++import org.apache.hadoop.mapred.JobConf;
++import org.apache.hadoop.util.Tool;
++import org.apache.hadoop.util.ToolRunner;
++import org.apache.hadoop.yarn.api.records.ApplicationReport;
++import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
++import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster;
++import org.apache.hadoop.yarn.applications.distributedshell.Client;
++import org.apache.hadoop.yarn.client.api.YarnClient;
++import org.apache.hadoop.yarn.conf.YarnConfiguration;
++import org.apache.hadoop.yarn.exceptions.YarnException;
++import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
++
++import java.io.File;
++import java.io.FileWriter;
++import java.io.IOException;
++import java.util.ArrayList;
++import java.util.Collections;
++import java.util.Comparator;
++import java.util.HashSet;
++import java.util.Iterator;
++import java.util.List;
++import java.util.Set;
++
++
++/**
++ * This tool moves Aggregated Log files into HAR archives using the
++ * {@link HadoopArchives} tool and the Distributed Shell via the
++ * {@link HadoopArchiveLogsRunner}.
++ */
++public class HadoopArchiveLogs implements Tool {
++ private static final Log LOG = LogFactory.getLog(HadoopArchiveLogs.class);
++
++ private static final String HELP_OPTION = "help";
++ private static final String MAX_ELIGIBLE_APPS_OPTION = "maxEligibleApps";
++ private static final String MIN_NUM_LOG_FILES_OPTION = "minNumberLogFiles";
++ private static final String MAX_TOTAL_LOGS_SIZE_OPTION = "maxTotalLogsSize";
++ private static final String MEMORY_OPTION = "memory";
++
++ private static final int DEFAULT_MAX_ELIGIBLE = -1;
++ private static final int DEFAULT_MIN_NUM_LOG_FILES = 20;
++ private static final long DEFAULT_MAX_TOTAL_LOGS_SIZE = 1024L;
++ private static final long DEFAULT_MEMORY = 1024L;
++
++ @VisibleForTesting
++ int maxEligible = DEFAULT_MAX_ELIGIBLE;
++ @VisibleForTesting
++ int minNumLogFiles = DEFAULT_MIN_NUM_LOG_FILES;
++ @VisibleForTesting
++ long maxTotalLogsSize = DEFAULT_MAX_TOTAL_LOGS_SIZE * 1024L * 1024L;
++ @VisibleForTesting
++ long memory = DEFAULT_MEMORY;
++
++ @VisibleForTesting
++ Set eligibleApplications;
++
++ private JobConf conf;
++
++ public HadoopArchiveLogs(Configuration conf) {
++ setConf(conf);
++ eligibleApplications = new HashSet<>();
++ }
++
++ public static void main(String[] args) {
++ JobConf job = new JobConf(HadoopArchiveLogs.class);
++
++ HadoopArchiveLogs hal = new HadoopArchiveLogs(job);
++ int ret = 0;
++
++ try{
++ ret = ToolRunner.run(hal, args);
++ } catch(Exception e) {
++ LOG.debug("Exception", e);
++ System.err.println(e.getClass().getSimpleName());
++ final String s = e.getLocalizedMessage();
++ if (s != null) {
++ System.err.println(s);
++ } else {
++ e.printStackTrace(System.err);
++ }
++ System.exit(1);
++ }
++ System.exit(ret);
++ }
++
++ @Override
++ public int run(String[] args) throws Exception {
++ handleOpts(args);
++
++ findAggregatedApps();
++
++ FileSystem fs = null;
++ Path remoteRootLogDir = new Path(conf.get(
++ YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
++ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
++ String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
++ Path workingDir = new Path(remoteRootLogDir, "archive-logs-work");
++ try {
++ fs = FileSystem.get(conf);
++ checkFiles(fs, remoteRootLogDir, suffix);
++
++ // Prepare working directory
++ if (fs.exists(workingDir)) {
++ fs.delete(workingDir, true);
++ }
++ fs.mkdirs(workingDir);
++ fs.setPermission(workingDir,
++ new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
++ } finally {
++ if (fs != null) {
++ fs.close();
++ }
++ }
++
++ checkMaxEligible();
++
++ if (eligibleApplications.isEmpty()) {
++ LOG.info("No eligible applications to process");
++ System.exit(0);
++ }
++
++ StringBuilder sb =
++ new StringBuilder("Will process the following applications:");
++ for (ApplicationReport report : eligibleApplications) {
++ sb.append("\n\t").append(report.getApplicationId());
++ }
++ LOG.info(sb.toString());
++
++ File localScript = File.createTempFile("hadoop-archive-logs-", ".sh");
++ generateScript(localScript, workingDir, remoteRootLogDir, suffix);
++
++ if (runDistributedShell(localScript)) {
++ return 0;
++ }
++ return -1;
++ }
++
++ private void handleOpts(String[] args) throws ParseException {
++ Options opts = new Options();
++ Option helpOpt = new Option(HELP_OPTION, false, "Prints this message");
++ Option maxEligibleOpt = new Option(MAX_ELIGIBLE_APPS_OPTION, true,
++ "The maximum number of eligible apps to process (default: "
++ + DEFAULT_MAX_ELIGIBLE + " (all))");
++ maxEligibleOpt.setArgName("n");
++ Option minNumLogFilesOpt = new Option(MIN_NUM_LOG_FILES_OPTION, true,
++ "The minimum number of log files required to be eligible (default: "
++ + DEFAULT_MIN_NUM_LOG_FILES + ")");
++ minNumLogFilesOpt.setArgName("n");
++ Option maxTotalLogsSizeOpt = new Option(MAX_TOTAL_LOGS_SIZE_OPTION, true,
++ "The maximum total logs size (in megabytes) required to be eligible" +
++ " (default: " + DEFAULT_MAX_TOTAL_LOGS_SIZE + ")");
++ maxTotalLogsSizeOpt.setArgName("megabytes");
++ Option memoryOpt = new Option(MEMORY_OPTION, true,
++ "The amount of memory (in megabytes) for each container (default: "
++ + DEFAULT_MEMORY + ")");
++ memoryOpt.setArgName("megabytes");
++ opts.addOption(helpOpt);
++ opts.addOption(maxEligibleOpt);
++ opts.addOption(minNumLogFilesOpt);
++ opts.addOption(maxTotalLogsSizeOpt);
++ opts.addOption(memoryOpt);
++
++ try {
++ CommandLineParser parser = new GnuParser();
++ CommandLine commandLine = parser.parse(opts, args);
++ if (commandLine.hasOption(HELP_OPTION)) {
++ HelpFormatter formatter = new HelpFormatter();
++ formatter.printHelp("yarn archive-logs", opts);
++ System.exit(0);
++ }
++ if (commandLine.hasOption(MAX_ELIGIBLE_APPS_OPTION)) {
++ maxEligible = Integer.parseInt(
++ commandLine.getOptionValue(MAX_ELIGIBLE_APPS_OPTION));
++ if (maxEligible == 0) {
++ LOG.info("Setting " + MAX_ELIGIBLE_APPS_OPTION + " to 0 accomplishes "
++ + "nothing. Please either set it to a negative value "
++ + "(default, all) or a more reasonable value.");
++ System.exit(0);
++ }
++ }
++ if (commandLine.hasOption(MIN_NUM_LOG_FILES_OPTION)) {
++ minNumLogFiles = Integer.parseInt(
++ commandLine.getOptionValue(MIN_NUM_LOG_FILES_OPTION));
++ }
++ if (commandLine.hasOption(MAX_TOTAL_LOGS_SIZE_OPTION)) {
++ maxTotalLogsSize = Long.parseLong(
++ commandLine.getOptionValue(MAX_TOTAL_LOGS_SIZE_OPTION));
++ maxTotalLogsSize *= 1024L * 1024L;
++ }
++ if (commandLine.hasOption(MEMORY_OPTION)) {
++ memory = Long.parseLong(commandLine.getOptionValue(MEMORY_OPTION));
++ }
++ } catch (ParseException pe) {
++ HelpFormatter formatter = new HelpFormatter();
++ formatter.printHelp("yarn archive-logs", opts);
++ throw pe;
++ }
++ }
++
++ @VisibleForTesting
++ void findAggregatedApps() throws IOException, YarnException {
++ YarnClient client = YarnClient.createYarnClient();
++ try {
++ client.init(getConf());
++ client.start();
++ List reports = client.getApplications();
++ for (ApplicationReport report : reports) {
++ LogAggregationStatus aggStatus = report.getLogAggregationStatus();
++ if (aggStatus.equals(LogAggregationStatus.SUCCEEDED) ||
++ aggStatus.equals(LogAggregationStatus.FAILED)) {
++ eligibleApplications.add(report);
++ }
++ }
++ } finally {
++ if (client != null) {
++ client.stop();
++ }
++ }
++ }
++
++ @VisibleForTesting
++ void checkFiles(FileSystem fs, Path remoteRootLogDir, String suffix) {
++ for (Iterator reportIt = eligibleApplications.iterator();
++ reportIt.hasNext(); ) {
++ ApplicationReport report = reportIt.next();
++ long totalFileSize = 0L;
++ try {
++ FileStatus[] files = fs.listStatus(
++ LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir,
++ report.getApplicationId(), report.getUser(), suffix));
++ if (files.length < minNumLogFiles) {
++ reportIt.remove();
++ } else {
++ for (FileStatus file : files) {
++ if (file.getPath().getName().equals(report.getApplicationId()
++ + ".har")) {
++ reportIt.remove();
++ break;
++ }
++ totalFileSize += file.getLen();
++ }
++ if (totalFileSize > maxTotalLogsSize) {
++ reportIt.remove();
++ }
++ }
++ } catch (IOException ioe) {
++ // If the user doesn't have permission or it doesn't exist, then skip it
++ reportIt.remove();
++ }
++ }
++ }
++
++ @VisibleForTesting
++ void checkMaxEligible() {
++ // If we have too many eligible apps, remove the newest ones first
++ if (maxEligible > 0 && eligibleApplications.size() > maxEligible) {
++ List sortedApplications =
++ new ArrayList(eligibleApplications);
++ Collections.sort(sortedApplications, new Comparator() {
++ @Override
++ public int compare(ApplicationReport o1, ApplicationReport o2) {
++ return Long.compare(o1.getFinishTime(), o2.getFinishTime());
++ }
++ });
++ for (int i = maxEligible; i < sortedApplications.size(); i++) {
++ eligibleApplications.remove(sortedApplications.get(i));
++ }
++ }
++ }
++
++ /*
++ The generated script looks like this:
++ #!/bin/bash
++ set -e
++ set -x
++ if [ "$YARN_SHELL_ID" == "1" ]; then
++ appId="application_1440448768987_0001"
++ user="rkanter"
++ elif [ "$YARN_SHELL_ID" == "2" ]; then
++ appId="application_1440448768987_0002"
++ user="rkanter"
++ else
++ echo "Unknown Mapping!"
++ exit 1
++ fi
++ export HADOOP_CLIENT_OPTS="-Xmx1024m"
++ export HADOOP_CLASSPATH=/dist/share/hadoop/tools/lib/hadoop-archive-logs-2.8.0-SNAPSHOT.jar:/dist/share/hadoop/tools/lib/hadoop-archives-2.8.0-SNAPSHOT.jar
++ "$HADOOP_HOME"/bin/hadoop org.apache.hadoop.tools.HadoopArchiveLogsRunner -appId "$appId" -user "$user" -workingDir /tmp/logs/archive-logs-work -remoteRootLogDir /tmp/logs -suffix logs
++ */
++ @VisibleForTesting
++ void generateScript(File localScript, Path workingDir,
++ Path remoteRootLogDir, String suffix) throws IOException {
++ LOG.info("Generating script at: " + localScript.getAbsolutePath());
++ String halrJarPath = HadoopArchiveLogsRunner.class.getProtectionDomain()
++ .getCodeSource().getLocation().getPath();
++ String harJarPath = HadoopArchives.class.getProtectionDomain()
++ .getCodeSource().getLocation().getPath();
++ String classpath = halrJarPath + File.pathSeparator + harJarPath;
++ FileWriter fw = null;
++ try {
++ fw = new FileWriter(localScript);
++ fw.write("#!/bin/bash\nset -e\nset -x\n");
++ int containerCount = 1;
++ for (ApplicationReport report : eligibleApplications) {
++ fw.write("if [ \"$YARN_SHELL_ID\" == \"");
++ fw.write(Integer.toString(containerCount));
++ fw.write("\" ]; then\n\tappId=\"");
++ fw.write(report.getApplicationId().toString());
++ fw.write("\"\n\tuser=\"");
++ fw.write(report.getUser());
++ fw.write("\"\nel");
++ containerCount++;
++ }
++ fw.write("se\n\techo \"Unknown Mapping!\"\n\texit 1\nfi\n");
++ fw.write("export HADOOP_CLIENT_OPTS=\"-Xmx");
++ fw.write(Long.toString(memory));
++ fw.write("m\"\n");
++ fw.write("export HADOOP_CLASSPATH=");
++ fw.write(classpath);
++ fw.write("\n\"$HADOOP_HOME\"/bin/hadoop ");
++ fw.write(HadoopArchiveLogsRunner.class.getName());
++ fw.write(" -appId \"$appId\" -user \"$user\" -workingDir ");
++ fw.write(workingDir.toString());
++ fw.write(" -remoteRootLogDir ");
++ fw.write(remoteRootLogDir.toString());
++ fw.write(" -suffix ");
++ fw.write(suffix);
++ fw.write("\n");
++ } finally {
++ if (fw != null) {
++ fw.close();
++ }
++ }
++ }
++
++ private boolean runDistributedShell(File localScript) throws Exception {
++ String[] dsArgs = {
++ "--appname",
++ "ArchiveLogs",
++ "--jar",
++ ApplicationMaster.class.getProtectionDomain().getCodeSource()
++ .getLocation().getPath(),
++ "--num_containers",
++ Integer.toString(eligibleApplications.size()),
++ "--container_memory",
++ Long.toString(memory),
++ "--shell_script",
++ localScript.getAbsolutePath()
++ };
++ final Client dsClient = new Client(new Configuration(conf));
++ dsClient.init(dsArgs);
++ return dsClient.run();
++ }
++
++ @Override
++ public void setConf(Configuration conf) {
++ if (conf instanceof JobConf) {
++ this.conf = (JobConf) conf;
++ } else {
++ this.conf = new JobConf(conf, HadoopArchiveLogs.class);
++ }
++ }
++
++ @Override
++ public Configuration getConf() {
++ return this.conf;
++ }
++}
+diff --git hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java
+new file mode 100644
+index 0000000..347e5fb
+--- /dev/null
++++ hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java
+@@ -0,0 +1,180 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.hadoop.tools;
++
++import org.apache.commons.cli.CommandLine;
++import org.apache.commons.cli.CommandLineParser;
++import org.apache.commons.cli.GnuParser;
++import org.apache.commons.cli.Option;
++import org.apache.commons.cli.Options;
++import org.apache.commons.cli.ParseException;
++import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.fs.PathFilter;
++import org.apache.hadoop.mapred.JobConf;
++import org.apache.hadoop.util.Tool;
++import org.apache.hadoop.util.ToolRunner;
++
++import java.io.File;
++
++/**
++ * This is a child program designed to be used by the {@link HadoopArchiveLogs}
++ * tool via the Distributed Shell. It's not meant to be run directly.
++ */
++public class HadoopArchiveLogsRunner implements Tool {
++ private static final Log LOG = LogFactory.getLog(HadoopArchiveLogsRunner.class);
++
++ private static final String APP_ID_OPTION = "appId";
++ private static final String USER_OPTION = "user";
++ private static final String WORKING_DIR_OPTION = "workingDir";
++ private static final String REMOTE_ROOT_LOG_DIR = "remoteRootLogDir";
++ private static final String SUFFIX_OPTION = "suffix";
++
++ private String appId;
++ private String user;
++ private String workingDir;
++ private String remoteLogDir;
++ private String suffix;
++
++ private JobConf conf;
++
++ public HadoopArchiveLogsRunner(Configuration conf) {
++ setConf(conf);
++ }
++
++ public static void main(String[] args) {
++ JobConf job = new JobConf(HadoopArchiveLogsRunner.class);
++
++ HadoopArchiveLogsRunner halr = new HadoopArchiveLogsRunner(job);
++ int ret = 0;
++
++ try{
++ ret = ToolRunner.run(halr, args);
++ } catch(Exception e) {
++ LOG.debug("Exception", e);
++ System.err.println(e.getClass().getSimpleName());
++ final String s = e.getLocalizedMessage();
++ if (s != null) {
++ System.err.println(s);
++ } else {
++ e.printStackTrace(System.err);
++ }
++ System.exit(1);
++ }
++ System.exit(ret);
++ }
++
++ @Override
++ public int run(String[] args) throws Exception {
++ handleOpts(args);
++ String remoteAppLogDir = remoteLogDir + File.separator + user
++ + File.separator + suffix + File.separator + appId;
++
++ // Run 'hadoop archives' command in local mode
++ Configuration haConf = new Configuration(getConf());
++ haConf.set("mapreduce.framework.name", "local");
++ HadoopArchives ha = new HadoopArchives(haConf);
++ String[] haArgs = {
++ "-archiveName",
++ appId + ".har",
++ "-p",
++ remoteAppLogDir,
++ "*",
++ workingDir
++ };
++ StringBuilder sb = new StringBuilder("Executing 'hadoop archives'");
++ for (String haArg : haArgs) {
++ sb.append("\n\t").append(haArg);
++ }
++ LOG.info(sb.toString());
++ ha.run(haArgs);
++
++ FileSystem fs = null;
++ // Move har file to correct location and delete original logs
++ try {
++ fs = FileSystem.get(conf);
++ LOG.info("Moving har to original location");
++ fs.rename(new Path(workingDir, appId + ".har"),
++ new Path(remoteAppLogDir, appId + ".har"));
++ LOG.info("Deleting original logs");
++ for (FileStatus original : fs.listStatus(new Path(remoteAppLogDir),
++ new PathFilter() {
++ @Override
++ public boolean accept(Path path) {
++ return !path.getName().endsWith(".har");
++ }
++ })) {
++ fs.delete(original.getPath(), false);
++ }
++ } finally {
++ if (fs != null) {
++ fs.close();
++ }
++ }
++
++ return 0;
++ }
++
++ private void handleOpts(String[] args) throws ParseException {
++ Options opts = new Options();
++ Option appIdOpt = new Option(APP_ID_OPTION, true, "Application ID");
++ appIdOpt.setRequired(true);
++ Option userOpt = new Option(USER_OPTION, true, "User");
++ userOpt.setRequired(true);
++ Option workingDirOpt = new Option(WORKING_DIR_OPTION, true,
++ "Working Directory");
++ workingDirOpt.setRequired(true);
++ Option remoteLogDirOpt = new Option(REMOTE_ROOT_LOG_DIR, true,
++ "Remote Root Log Directory");
++ remoteLogDirOpt.setRequired(true);
++ Option suffixOpt = new Option(SUFFIX_OPTION, true, "Suffix");
++ suffixOpt.setRequired(true);
++ opts.addOption(appIdOpt);
++ opts.addOption(userOpt);
++ opts.addOption(workingDirOpt);
++ opts.addOption(remoteLogDirOpt);
++ opts.addOption(suffixOpt);
++
++ CommandLineParser parser = new GnuParser();
++ CommandLine commandLine = parser.parse(opts, args);
++ appId = commandLine.getOptionValue(APP_ID_OPTION);
++ user = commandLine.getOptionValue(USER_OPTION);
++ workingDir = commandLine.getOptionValue(WORKING_DIR_OPTION);
++ remoteLogDir = commandLine.getOptionValue(REMOTE_ROOT_LOG_DIR);
++ suffix = commandLine.getOptionValue(SUFFIX_OPTION);
++ }
++
++ @Override
++ public void setConf(Configuration conf) {
++ if (conf instanceof JobConf) {
++ this.conf = (JobConf) conf;
++ } else {
++ this.conf = new JobConf(conf, HadoopArchiveLogsRunner.class);
++ }
++ }
++
++ @Override
++ public Configuration getConf() {
++ return this.conf;
++ }
++}
+diff --git hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
+new file mode 100644
+index 0000000..c8ff201
+--- /dev/null
++++ hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
+@@ -0,0 +1,293 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.hadoop.tools;
++
++import org.apache.commons.io.IOUtils;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FSDataOutputStream;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
++import org.apache.hadoop.yarn.api.records.ApplicationId;
++import org.apache.hadoop.yarn.api.records.ApplicationReport;
++import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
++import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
++import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
++import org.apache.hadoop.yarn.api.records.Priority;
++import org.apache.hadoop.yarn.api.records.Resource;
++import org.apache.hadoop.yarn.api.records.YarnApplicationState;
++import org.apache.hadoop.yarn.conf.YarnConfiguration;
++import org.apache.hadoop.yarn.server.MiniYARNCluster;
++import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
++import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
++import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
++import org.junit.Assert;
++import org.junit.Test;
++
++import java.io.File;
++import java.io.IOException;
++import java.util.Random;
++
++public class TestHadoopArchiveLogs {
++
++ private static final long CLUSTER_TIMESTAMP = System.currentTimeMillis();
++ private static final int FILE_SIZE_INCREMENT = 4096;
++ private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT];
++ static {
++ new Random().nextBytes(DUMMY_DATA);
++ }
++
++ @Test(timeout = 10000)
++ public void testCheckFiles() throws Exception {
++ Configuration conf = new Configuration();
++ HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
++ FileSystem fs = FileSystem.getLocal(conf);
++ Path rootLogDir = new Path("target", "logs");
++ String suffix = "logs";
++ Path logDir = new Path(rootLogDir,
++ new Path(System.getProperty("user.name"), suffix));
++ fs.mkdirs(logDir);
++
++ Assert.assertEquals(0, hal.eligibleApplications.size());
++ ApplicationReport app1 = createAppReport(1); // no files found
++ ApplicationReport app2 = createAppReport(2); // too few files
++ Path app2Path = new Path(logDir, app2.getApplicationId().toString());
++ fs.mkdirs(app2Path);
++ createFile(fs, new Path(app2Path, "file1"), 1);
++ hal.minNumLogFiles = 2;
++ ApplicationReport app3 = createAppReport(3); // too large
++ Path app3Path = new Path(logDir, app3.getApplicationId().toString());
++ fs.mkdirs(app3Path);
++ createFile(fs, new Path(app3Path, "file1"), 2);
++ createFile(fs, new Path(app3Path, "file2"), 5);
++ hal.maxTotalLogsSize = FILE_SIZE_INCREMENT * 6;
++ ApplicationReport app4 = createAppReport(4); // has har already
++ Path app4Path = new Path(logDir, app4.getApplicationId().toString());
++ fs.mkdirs(app4Path);
++ createFile(fs, new Path(app4Path, app4.getApplicationId() + ".har"), 1);
++ ApplicationReport app5 = createAppReport(5); // just right
++ Path app5Path = new Path(logDir, app5.getApplicationId().toString());
++ fs.mkdirs(app5Path);
++ createFile(fs, new Path(app5Path, "file1"), 2);
++ createFile(fs, new Path(app5Path, "file2"), 3);
++ hal.eligibleApplications.add(app1);
++ hal.eligibleApplications.add(app2);
++ hal.eligibleApplications.add(app3);
++ hal.eligibleApplications.add(app4);
++ hal.eligibleApplications.add(app5);
++
++ hal.checkFiles(fs, rootLogDir, suffix);
++ Assert.assertEquals(1, hal.eligibleApplications.size());
++ Assert.assertEquals(app5, hal.eligibleApplications.iterator().next());
++ }
++
++ @Test(timeout = 10000)
++ public void testCheckMaxEligible() throws Exception {
++ Configuration conf = new Configuration();
++ HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
++ ApplicationReport app1 = createAppReport(1);
++ app1.setFinishTime(CLUSTER_TIMESTAMP - 5);
++ ApplicationReport app2 = createAppReport(2);
++ app2.setFinishTime(CLUSTER_TIMESTAMP - 10);
++ ApplicationReport app3 = createAppReport(3);
++ app3.setFinishTime(CLUSTER_TIMESTAMP + 5);
++ ApplicationReport app4 = createAppReport(4);
++ app4.setFinishTime(CLUSTER_TIMESTAMP + 10);
++ ApplicationReport app5 = createAppReport(5);
++ app5.setFinishTime(CLUSTER_TIMESTAMP);
++ Assert.assertEquals(0, hal.eligibleApplications.size());
++ hal.eligibleApplications.add(app1);
++ hal.eligibleApplications.add(app2);
++ hal.eligibleApplications.add(app3);
++ hal.eligibleApplications.add(app4);
++ hal.eligibleApplications.add(app5);
++ hal.maxEligible = -1;
++ hal.checkMaxEligible();
++ Assert.assertEquals(5, hal.eligibleApplications.size());
++
++ hal.maxEligible = 4;
++ hal.checkMaxEligible();
++ Assert.assertEquals(4, hal.eligibleApplications.size());
++ Assert.assertFalse(hal.eligibleApplications.contains(app4));
++
++ hal.maxEligible = 3;
++ hal.checkMaxEligible();
++ Assert.assertEquals(3, hal.eligibleApplications.size());
++ Assert.assertFalse(hal.eligibleApplications.contains(app3));
++
++ hal.maxEligible = 2;
++ hal.checkMaxEligible();
++ Assert.assertEquals(2, hal.eligibleApplications.size());
++ Assert.assertFalse(hal.eligibleApplications.contains(app5));
++
++ hal.maxEligible = 1;
++ hal.checkMaxEligible();
++ Assert.assertEquals(1, hal.eligibleApplications.size());
++ Assert.assertFalse(hal.eligibleApplications.contains(app1));
++ }
++
++ @Test(timeout = 10000)
++ public void testFindAggregatedApps() throws Exception {
++ MiniYARNCluster yarnCluster = null;
++ try {
++ Configuration conf = new Configuration();
++ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
++ yarnCluster =
++ new MiniYARNCluster(TestHadoopArchiveLogs.class.getSimpleName(), 1,
++ 1, 1, 1);
++ yarnCluster.init(conf);
++ yarnCluster.start();
++ conf = yarnCluster.getConfig();
++
++ RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
++ RMAppImpl app1 = (RMAppImpl)createRMApp(1, conf, rmContext,
++ LogAggregationStatus.DISABLED);
++ RMAppImpl app2 = (RMAppImpl)createRMApp(2, conf, rmContext,
++ LogAggregationStatus.FAILED);
++ RMAppImpl app3 = (RMAppImpl)createRMApp(3, conf, rmContext,
++ LogAggregationStatus.NOT_START);
++ RMAppImpl app4 = (RMAppImpl)createRMApp(4, conf, rmContext,
++ LogAggregationStatus.SUCCEEDED);
++ RMAppImpl app5 = (RMAppImpl)createRMApp(5, conf, rmContext,
++ LogAggregationStatus.RUNNING);
++ RMAppImpl app6 = (RMAppImpl)createRMApp(6, conf, rmContext,
++ LogAggregationStatus.RUNNING_WITH_FAILURE);
++ RMAppImpl app7 = (RMAppImpl)createRMApp(7, conf, rmContext,
++ LogAggregationStatus.TIME_OUT);
++ rmContext.getRMApps().put(app1.getApplicationId(), app1);
++ rmContext.getRMApps().put(app2.getApplicationId(), app2);
++ rmContext.getRMApps().put(app3.getApplicationId(), app3);
++ rmContext.getRMApps().put(app4.getApplicationId(), app4);
++ rmContext.getRMApps().put(app5.getApplicationId(), app5);
++ rmContext.getRMApps().put(app6.getApplicationId(), app6);
++ rmContext.getRMApps().put(app7.getApplicationId(), app7);
++
++ HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
++ Assert.assertEquals(0, hal.eligibleApplications.size());
++ hal.findAggregatedApps();
++ Assert.assertEquals(2, hal.eligibleApplications.size());
++ } finally {
++ if (yarnCluster != null) {
++ yarnCluster.stop();
++ }
++ }
++ }
++
++ @Test(timeout = 10000)
++ public void testGenerateScript() throws Exception {
++ Configuration conf = new Configuration();
++ HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
++ ApplicationReport app1 = createAppReport(1);
++ ApplicationReport app2 = createAppReport(2);
++ hal.eligibleApplications.add(app1);
++ hal.eligibleApplications.add(app2);
++
++ File localScript = new File("target", "script.sh");
++ Path workingDir = new Path("/tmp", "working");
++ Path remoteRootLogDir = new Path("/tmp", "logs");
++ String suffix = "logs";
++ localScript.delete();
++ Assert.assertFalse(localScript.exists());
++ hal.generateScript(localScript, workingDir, remoteRootLogDir, suffix);
++ Assert.assertTrue(localScript.exists());
++ String script = IOUtils.toString(localScript.toURI());
++ String[] lines = script.split(System.lineSeparator());
++ Assert.assertEquals(16, lines.length);
++ Assert.assertEquals("#!/bin/bash", lines[0]);
++ Assert.assertEquals("set -e", lines[1]);
++ Assert.assertEquals("set -x", lines[2]);
++ Assert.assertEquals("if [ \"$YARN_SHELL_ID\" == \"1\" ]; then", lines[3]);
++ if (lines[4].contains(app1.getApplicationId().toString())) {
++ Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString()
++ + "\"", lines[4]);
++ Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString()
++ + "\"", lines[7]);
++ } else {
++ Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString()
++ + "\"", lines[4]);
++ Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString()
++ + "\"", lines[7]);
++ }
++ Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"",
++ lines[5]);
++ Assert.assertEquals("elif [ \"$YARN_SHELL_ID\" == \"2\" ]; then", lines[6]);
++ Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"",
++ lines[8]);
++ Assert.assertEquals("else", lines[9]);
++ Assert.assertEquals("\techo \"Unknown Mapping!\"", lines[10]);
++ Assert.assertEquals("\texit 1", lines[11]);
++ Assert.assertEquals("fi", lines[12]);
++ Assert.assertEquals("export HADOOP_CLIENT_OPTS=\"-Xmx1024m\"", lines[13]);
++ Assert.assertTrue(lines[14].startsWith("export HADOOP_CLASSPATH="));
++ Assert.assertEquals("\"$HADOOP_HOME\"/bin/hadoop org.apache.hadoop.tools." +
++ "HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" -workingDir "
++ + workingDir.toString() + " -remoteRootLogDir " +
++ remoteRootLogDir.toString() + " -suffix " + suffix, lines[15]);
++ }
++
++ private static ApplicationReport createAppReport(int id) {
++ ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id);
++ return ApplicationReport.newInstance(
++ appId,
++ ApplicationAttemptId.newInstance(appId, 1),
++ System.getProperty("user.name"),
++ null, null, null, 0, null, YarnApplicationState.FINISHED, null,
++ null, 0L, 0L, FinalApplicationStatus.SUCCEEDED, null, null, 100f,
++ null, null);
++ }
++
++ private static void createFile(FileSystem fs, Path p, long sizeMultiple)
++ throws IOException {
++ FSDataOutputStream out = null;
++ try {
++ out = fs.create(p);
++ for (int i = 0 ; i < sizeMultiple; i++) {
++ out.write(DUMMY_DATA);
++ }
++ } finally {
++ if (out != null) {
++ out.close();
++ }
++ }
++ }
++
++ private static RMApp createRMApp(int id, Configuration conf, RMContext rmContext,
++ final LogAggregationStatus aggStatus) {
++ ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id);
++ ApplicationSubmissionContext submissionContext =
++ ApplicationSubmissionContext.newInstance(appId, "test", "default",
++ Priority.newInstance(0), null, false, true,
++ 2, Resource.newInstance(10, 2), "test");
++ return new RMAppImpl(appId, rmContext, conf, "test",
++ System.getProperty("user.name"), "default", submissionContext,
++ rmContext.getScheduler(),
++ rmContext.getApplicationMasterService(),
++ System.currentTimeMillis(), "test",
++ null, null) {
++ @Override
++ public ApplicationReport createAndGetApplicationReport(
++ String clientUserName, boolean allowAccess) {
++ ApplicationReport report =
++ super.createAndGetApplicationReport(clientUserName, allowAccess);
++ report.setLogAggregationStatus(aggStatus);
++ return report;
++ }
++ };
++ }
++}
+diff --git hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java
+new file mode 100644
+index 0000000..af66f14
+--- /dev/null
++++ hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java
+@@ -0,0 +1,143 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.hadoop.tools;
++
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FSDataOutputStream;
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.HarFs;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.hdfs.MiniDFSCluster;
++import org.apache.hadoop.util.ToolRunner;
++import org.apache.hadoop.yarn.api.records.ApplicationId;
++import org.apache.hadoop.yarn.conf.YarnConfiguration;
++import org.apache.hadoop.yarn.server.MiniYARNCluster;
++import org.junit.Assert;
++import org.junit.Test;
++
++import java.io.IOException;
++import java.util.Arrays;
++import java.util.Comparator;
++import java.util.Random;
++
++import static org.junit.Assert.assertEquals;
++
++public class TestHadoopArchiveLogsRunner {
++
++ private static final int FILE_SIZE_INCREMENT = 4096;
++ private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT];
++ static {
++ new Random().nextBytes(DUMMY_DATA);
++ }
++
++ @Test(timeout = 30000)
++ public void testHadoopArchiveLogs() throws Exception {
++ MiniYARNCluster yarnCluster = null;
++ MiniDFSCluster dfsCluster = null;
++ FileSystem fs = null;
++ try {
++ Configuration conf = new YarnConfiguration();
++ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
++ conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
++ yarnCluster =
++ new MiniYARNCluster(TestHadoopArchiveLogsRunner.class.getSimpleName(),
++ 1, 2, 1, 1);
++ yarnCluster.init(conf);
++ yarnCluster.start();
++ conf = yarnCluster.getConfig();
++ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
++
++ ApplicationId app1 =
++ ApplicationId.newInstance(System.currentTimeMillis(), 1);
++ fs = FileSystem.get(conf);
++ Path remoteRootLogDir = new Path(conf.get(
++ YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
++ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
++ Path workingDir = new Path(remoteRootLogDir, "archive-logs-work");
++ String suffix = "logs";
++ Path logDir = new Path(remoteRootLogDir,
++ new Path(System.getProperty("user.name"), suffix));
++ fs.mkdirs(logDir);
++ Path app1Path = new Path(logDir, app1.toString());
++ fs.mkdirs(app1Path);
++ createFile(fs, new Path(app1Path, "log1"), 3);
++ createFile(fs, new Path(app1Path, "log2"), 4);
++ createFile(fs, new Path(app1Path, "log3"), 2);
++ FileStatus[] app1Files = fs.listStatus(app1Path);
++ Assert.assertEquals(3, app1Files.length);
++
++ String[] args = new String[]{
++ "-appId", app1.toString(),
++ "-user", System.getProperty("user.name"),
++ "-workingDir", workingDir.toString(),
++ "-remoteRootLogDir", remoteRootLogDir.toString(),
++ "-suffix", suffix};
++ final HadoopArchiveLogsRunner halr = new HadoopArchiveLogsRunner(conf);
++ assertEquals(0, ToolRunner.run(halr, args));
++
++ fs = FileSystem.get(conf);
++ app1Files = fs.listStatus(app1Path);
++ Assert.assertEquals(1, app1Files.length);
++ FileStatus harFile = app1Files[0];
++ Assert.assertEquals(app1.toString() + ".har", harFile.getPath().getName());
++ Path harPath = new Path("har:///" + harFile.getPath().toUri().getRawPath());
++ FileStatus[] harLogs = HarFs.get(harPath.toUri(), conf).listStatus(harPath);
++ Assert.assertEquals(3, harLogs.length);
++ Arrays.sort(harLogs, new Comparator() {
++ @Override
++ public int compare(FileStatus o1, FileStatus o2) {
++ return o1.getPath().getName().compareTo(o2.getPath().getName());
++ }
++ });
++ Assert.assertEquals("log1", harLogs[0].getPath().getName());
++ Assert.assertEquals(3 * FILE_SIZE_INCREMENT, harLogs[0].getLen());
++ Assert.assertEquals("log2", harLogs[1].getPath().getName());
++ Assert.assertEquals(4 * FILE_SIZE_INCREMENT, harLogs[1].getLen());
++ Assert.assertEquals("log3", harLogs[2].getPath().getName());
++ Assert.assertEquals(2 * FILE_SIZE_INCREMENT, harLogs[2].getLen());
++ Assert.assertEquals(0, fs.listStatus(workingDir).length);
++ } finally {
++ if (yarnCluster != null) {
++ yarnCluster.stop();
++ }
++ if (fs != null) {
++ fs.close();
++ }
++ if (dfsCluster != null) {
++ dfsCluster.shutdown();
++ }
++ }
++ }
++
++ private static void createFile(FileSystem fs, Path p, long sizeMultiple)
++ throws IOException {
++ FSDataOutputStream out = null;
++ try {
++ out = fs.create(p);
++ for (int i = 0 ; i < sizeMultiple; i++) {
++ out.write(DUMMY_DATA);
++ }
++ } finally {
++ if (out != null) {
++ out.close();
++ }
++ }
++ }
++}
+diff --git hadoop-tools/hadoop-tools-dist/pom.xml hadoop-tools/hadoop-tools-dist/pom.xml
+index 635d6ca..f19f313 100644
+--- hadoop-tools/hadoop-tools-dist/pom.xml
++++ hadoop-tools/hadoop-tools-dist/pom.xml
+@@ -52,6 +52,11 @@
+
+
+ org.apache.hadoop
++ hadoop-archive-logs
++ compile
++
++
++ org.apache.hadoop
+ hadoop-rumen
+ compile
+
+diff --git hadoop-tools/pom.xml hadoop-tools/pom.xml
+index 77b442a..f85b1d9 100644
+--- hadoop-tools/pom.xml
++++ hadoop-tools/pom.xml
+@@ -34,6 +34,7 @@
+ hadoop-streaming
+ hadoop-distcp
+ hadoop-archives
++ hadoop-archive-logs
+ hadoop-rumen
+ hadoop-gridmix
+ hadoop-datajoin
diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
index fa557034fba..3f646e69799 100644
--- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
+++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
@@ -51,6 +51,13 @@
*-sources.jar
+
+ ../hadoop-archive-logs/target
+ /share/hadoop/${hadoop.component}/sources
+
+ *-sources.jar
+
+
../hadoop-datajoin/target
/share/hadoop/${hadoop.component}/sources
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 76b54cae229..bfe06763afe 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -15,6 +15,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6304. Specifying node labels when submitting MR jobs.
(Naganarasimha G R via wangda)
+ MAPREDUCE-6415. Create a tool to combine aggregated logs into HAR files.
+ (Robert Kanter via kasha)
+
IMPROVEMENTS
MAPREDUCE-6291. Correct mapred queue usage command.
diff --git a/hadoop-mapreduce-project/bin/mapred b/hadoop-mapreduce-project/bin/mapred
index 277dc96ce8b..6f3bc04baf9 100755
--- a/hadoop-mapreduce-project/bin/mapred
+++ b/hadoop-mapreduce-project/bin/mapred
@@ -39,6 +39,7 @@ function print_usage(){
echo " historyserver run job history servers as a standalone daemon"
echo " distcp copy file or directories recursively"
echo " archive -archiveName NAME -p * create a hadoop archive"
+ echo " archive-logs combine aggregated logs into hadoop archives"
echo " hsadmin job history server admin interface"
echo ""
echo "Most commands print help when invoked w/o parameters."
@@ -96,6 +97,10 @@ elif [ "$COMMAND" = "archive" ] ; then
CLASS=org.apache.hadoop.tools.HadoopArchives
CLASSPATH=${CLASSPATH}:${TOOL_PATH}
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
+elif [ "$COMMAND" = "archive-logs" ] ; then
+ CLASS=org.apache.hadoop.tools.HadoopArchiveLogs
+ CLASSPATH=${CLASSPATH}:${TOOL_PATH}
+ HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "hsadmin" ] ; then
CLASS=org.apache.hadoop.mapreduce.v2.hs.client.HSAdmin
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 1a01d726e41..f7b90ce72cb 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -322,6 +322,11 @@
hadoop-archives
${project.version}
+
+ org.apache.hadoop
+ hadoop-archive-logs
+ ${project.version}
+
org.apache.hadoop
hadoop-distcp
diff --git a/hadoop-tools/hadoop-archive-logs/pom.xml b/hadoop-tools/hadoop-archive-logs/pom.xml
new file mode 100644
index 00000000000..4b53b7857fa
--- /dev/null
+++ b/hadoop-tools/hadoop-archive-logs/pom.xml
@@ -0,0 +1,171 @@
+
+
+
+ 4.0.0
+
+ org.apache.hadoop
+ hadoop-project
+ 2.8.0-SNAPSHOT
+ ../../hadoop-project
+
+ org.apache.hadoop
+ hadoop-archive-logs
+ 2.8.0-SNAPSHOT
+ Apache Hadoop Archive Logs
+ Apache Hadoop Archive Logs
+ jar
+
+
+ ${project.build.directory}/log
+
+
+
+
+ junit
+ junit
+ test
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+ provided
+
+
+ org.apache.hadoop
+ hadoop-yarn-applications-distributedshell
+ provided
+
+
+ org.apache.hadoop
+ hadoop-common
+ provided
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ test
+ test-jar
+
+
+ org.apache.hadoop
+ hadoop-yarn-server-tests
+ test-jar
+ test
+
+
+ org.apache.hadoop
+ hadoop-archives
+
+
+ org.apache.hadoop
+ hadoop-yarn-common
+ provided
+
+
+ org.apache.hadoop
+ hadoop-yarn-api
+ provided
+
+
+ com.google.guava
+ guava
+ provided
+
+
+ commons-io
+ commons-io
+ provided
+
+
+ commons-logging
+ commons-logging
+ provided
+
+
+ commons-cli
+ commons-cli
+ provided
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+ provided
+
+
+ org.apache.hadoop
+ hadoop-yarn-server-resourcemanager
+ provided
+
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ test
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ test
+ test-jar
+
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-jobclient
+ test
+ test-jar
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+
+
+ create-log-dir
+ process-test-resources
+
+ run
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ org.apache.hadoop.tools.HadoopArchiveLogs
+
+
+
+
+
+
+
diff --git a/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java
new file mode 100644
index 00000000000..4778dcbdce0
--- /dev/null
+++ b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster;
+import org.apache.hadoop.yarn.applications.distributedshell.Client;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * This tool moves Aggregated Log files into HAR archives using the
+ * {@link HadoopArchives} tool and the Distributed Shell via the
+ * {@link HadoopArchiveLogsRunner}.
+ */
+public class HadoopArchiveLogs implements Tool {
+ private static final Log LOG = LogFactory.getLog(HadoopArchiveLogs.class);
+
+ private static final String HELP_OPTION = "help";
+ private static final String MAX_ELIGIBLE_APPS_OPTION = "maxEligibleApps";
+ private static final String MIN_NUM_LOG_FILES_OPTION = "minNumberLogFiles";
+ private static final String MAX_TOTAL_LOGS_SIZE_OPTION = "maxTotalLogsSize";
+ private static final String MEMORY_OPTION = "memory";
+
+ private static final int DEFAULT_MAX_ELIGIBLE = -1;
+ private static final int DEFAULT_MIN_NUM_LOG_FILES = 20;
+ private static final long DEFAULT_MAX_TOTAL_LOGS_SIZE = 1024L;
+ private static final long DEFAULT_MEMORY = 1024L;
+
+ @VisibleForTesting
+ int maxEligible = DEFAULT_MAX_ELIGIBLE;
+ @VisibleForTesting
+ int minNumLogFiles = DEFAULT_MIN_NUM_LOG_FILES;
+ @VisibleForTesting
+ long maxTotalLogsSize = DEFAULT_MAX_TOTAL_LOGS_SIZE * 1024L * 1024L;
+ @VisibleForTesting
+ long memory = DEFAULT_MEMORY;
+
+ @VisibleForTesting
+ Set eligibleApplications;
+
+ private JobConf conf;
+
+ public HadoopArchiveLogs(Configuration conf) {
+ setConf(conf);
+ eligibleApplications = new HashSet<>();
+ }
+
+ public static void main(String[] args) {
+ JobConf job = new JobConf(HadoopArchiveLogs.class);
+
+ HadoopArchiveLogs hal = new HadoopArchiveLogs(job);
+ int ret = 0;
+
+ try{
+ ret = ToolRunner.run(hal, args);
+ } catch(Exception e) {
+ LOG.debug("Exception", e);
+ System.err.println(e.getClass().getSimpleName());
+ final String s = e.getLocalizedMessage();
+ if (s != null) {
+ System.err.println(s);
+ } else {
+ e.printStackTrace(System.err);
+ }
+ System.exit(1);
+ }
+ System.exit(ret);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ handleOpts(args);
+
+ findAggregatedApps();
+
+ FileSystem fs = null;
+ Path remoteRootLogDir = new Path(conf.get(
+ YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+ String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
+ Path workingDir = new Path(remoteRootLogDir, "archive-logs-work");
+ try {
+ fs = FileSystem.get(conf);
+ checkFiles(fs, remoteRootLogDir, suffix);
+
+ // Prepare working directory
+ if (fs.exists(workingDir)) {
+ fs.delete(workingDir, true);
+ }
+ fs.mkdirs(workingDir);
+ fs.setPermission(workingDir,
+ new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
+ } finally {
+ if (fs != null) {
+ fs.close();
+ }
+ }
+
+ checkMaxEligible();
+
+ if (eligibleApplications.isEmpty()) {
+ LOG.info("No eligible applications to process");
+ System.exit(0);
+ }
+
+ StringBuilder sb =
+ new StringBuilder("Will process the following applications:");
+ for (ApplicationReport report : eligibleApplications) {
+ sb.append("\n\t").append(report.getApplicationId());
+ }
+ LOG.info(sb.toString());
+
+ File localScript = File.createTempFile("hadoop-archive-logs-", ".sh");
+ generateScript(localScript, workingDir, remoteRootLogDir, suffix);
+
+ if (runDistributedShell(localScript)) {
+ return 0;
+ }
+ return -1;
+ }
+
+ private void handleOpts(String[] args) throws ParseException {
+ Options opts = new Options();
+ Option helpOpt = new Option(HELP_OPTION, false, "Prints this message");
+ Option maxEligibleOpt = new Option(MAX_ELIGIBLE_APPS_OPTION, true,
+ "The maximum number of eligible apps to process (default: "
+ + DEFAULT_MAX_ELIGIBLE + " (all))");
+ maxEligibleOpt.setArgName("n");
+ Option minNumLogFilesOpt = new Option(MIN_NUM_LOG_FILES_OPTION, true,
+ "The minimum number of log files required to be eligible (default: "
+ + DEFAULT_MIN_NUM_LOG_FILES + ")");
+ minNumLogFilesOpt.setArgName("n");
+ Option maxTotalLogsSizeOpt = new Option(MAX_TOTAL_LOGS_SIZE_OPTION, true,
+ "The maximum total logs size (in megabytes) required to be eligible" +
+ " (default: " + DEFAULT_MAX_TOTAL_LOGS_SIZE + ")");
+ maxTotalLogsSizeOpt.setArgName("megabytes");
+ Option memoryOpt = new Option(MEMORY_OPTION, true,
+ "The amount of memory (in megabytes) for each container (default: "
+ + DEFAULT_MEMORY + ")");
+ memoryOpt.setArgName("megabytes");
+ opts.addOption(helpOpt);
+ opts.addOption(maxEligibleOpt);
+ opts.addOption(minNumLogFilesOpt);
+ opts.addOption(maxTotalLogsSizeOpt);
+ opts.addOption(memoryOpt);
+
+ try {
+ CommandLineParser parser = new GnuParser();
+ CommandLine commandLine = parser.parse(opts, args);
+ if (commandLine.hasOption(HELP_OPTION)) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("yarn archive-logs", opts);
+ System.exit(0);
+ }
+ if (commandLine.hasOption(MAX_ELIGIBLE_APPS_OPTION)) {
+ maxEligible = Integer.parseInt(
+ commandLine.getOptionValue(MAX_ELIGIBLE_APPS_OPTION));
+ if (maxEligible == 0) {
+ LOG.info("Setting " + MAX_ELIGIBLE_APPS_OPTION + " to 0 accomplishes "
+ + "nothing. Please either set it to a negative value "
+ + "(default, all) or a more reasonable value.");
+ System.exit(0);
+ }
+ }
+ if (commandLine.hasOption(MIN_NUM_LOG_FILES_OPTION)) {
+ minNumLogFiles = Integer.parseInt(
+ commandLine.getOptionValue(MIN_NUM_LOG_FILES_OPTION));
+ }
+ if (commandLine.hasOption(MAX_TOTAL_LOGS_SIZE_OPTION)) {
+ maxTotalLogsSize = Long.parseLong(
+ commandLine.getOptionValue(MAX_TOTAL_LOGS_SIZE_OPTION));
+ maxTotalLogsSize *= 1024L * 1024L;
+ }
+ if (commandLine.hasOption(MEMORY_OPTION)) {
+ memory = Long.parseLong(commandLine.getOptionValue(MEMORY_OPTION));
+ }
+ } catch (ParseException pe) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("yarn archive-logs", opts);
+ throw pe;
+ }
+ }
+
+ @VisibleForTesting
+ void findAggregatedApps() throws IOException, YarnException {
+ YarnClient client = YarnClient.createYarnClient();
+ try {
+ client.init(getConf());
+ client.start();
+ List reports = client.getApplications();
+ for (ApplicationReport report : reports) {
+ LogAggregationStatus aggStatus = report.getLogAggregationStatus();
+ if (aggStatus.equals(LogAggregationStatus.SUCCEEDED) ||
+ aggStatus.equals(LogAggregationStatus.FAILED)) {
+ eligibleApplications.add(report);
+ }
+ }
+ } finally {
+ if (client != null) {
+ client.stop();
+ }
+ }
+ }
+
+ @VisibleForTesting
+ void checkFiles(FileSystem fs, Path remoteRootLogDir, String suffix) {
+ for (Iterator reportIt = eligibleApplications.iterator();
+ reportIt.hasNext(); ) {
+ ApplicationReport report = reportIt.next();
+ long totalFileSize = 0L;
+ try {
+ FileStatus[] files = fs.listStatus(
+ LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir,
+ report.getApplicationId(), report.getUser(), suffix));
+ if (files.length < minNumLogFiles) {
+ reportIt.remove();
+ } else {
+ for (FileStatus file : files) {
+ if (file.getPath().getName().equals(report.getApplicationId()
+ + ".har")) {
+ reportIt.remove();
+ break;
+ }
+ totalFileSize += file.getLen();
+ }
+ if (totalFileSize > maxTotalLogsSize) {
+ reportIt.remove();
+ }
+ }
+ } catch (IOException ioe) {
+ // If the user doesn't have permission or it doesn't exist, then skip it
+ reportIt.remove();
+ }
+ }
+ }
+
+ @VisibleForTesting
+ void checkMaxEligible() {
+ // If we have too many eligible apps, remove the newest ones first
+ if (maxEligible > 0 && eligibleApplications.size() > maxEligible) {
+ List sortedApplications =
+ new ArrayList(eligibleApplications);
+ Collections.sort(sortedApplications, new Comparator() {
+ @Override
+ public int compare(ApplicationReport o1, ApplicationReport o2) {
+ return Long.compare(o1.getFinishTime(), o2.getFinishTime());
+ }
+ });
+ for (int i = maxEligible; i < sortedApplications.size(); i++) {
+ eligibleApplications.remove(sortedApplications.get(i));
+ }
+ }
+ }
+
+ /*
+ The generated script looks like this:
+ #!/bin/bash
+ set -e
+ set -x
+ if [ "$YARN_SHELL_ID" == "1" ]; then
+ appId="application_1440448768987_0001"
+ user="rkanter"
+ elif [ "$YARN_SHELL_ID" == "2" ]; then
+ appId="application_1440448768987_0002"
+ user="rkanter"
+ else
+ echo "Unknown Mapping!"
+ exit 1
+ fi
+ export HADOOP_CLIENT_OPTS="-Xmx1024m"
+ export HADOOP_CLASSPATH=/dist/share/hadoop/tools/lib/hadoop-archive-logs-2.8.0-SNAPSHOT.jar:/dist/share/hadoop/tools/lib/hadoop-archives-2.8.0-SNAPSHOT.jar
+ "$HADOOP_HOME"/bin/hadoop org.apache.hadoop.tools.HadoopArchiveLogsRunner -appId "$appId" -user "$user" -workingDir /tmp/logs/archive-logs-work -remoteRootLogDir /tmp/logs -suffix logs
+ */
+ @VisibleForTesting
+ void generateScript(File localScript, Path workingDir,
+ Path remoteRootLogDir, String suffix) throws IOException {
+ LOG.info("Generating script at: " + localScript.getAbsolutePath());
+ String halrJarPath = HadoopArchiveLogsRunner.class.getProtectionDomain()
+ .getCodeSource().getLocation().getPath();
+ String harJarPath = HadoopArchives.class.getProtectionDomain()
+ .getCodeSource().getLocation().getPath();
+ String classpath = halrJarPath + File.pathSeparator + harJarPath;
+ FileWriter fw = null;
+ try {
+ fw = new FileWriter(localScript);
+ fw.write("#!/bin/bash\nset -e\nset -x\n");
+ int containerCount = 1;
+ for (ApplicationReport report : eligibleApplications) {
+ fw.write("if [ \"$YARN_SHELL_ID\" == \"");
+ fw.write(Integer.toString(containerCount));
+ fw.write("\" ]; then\n\tappId=\"");
+ fw.write(report.getApplicationId().toString());
+ fw.write("\"\n\tuser=\"");
+ fw.write(report.getUser());
+ fw.write("\"\nel");
+ containerCount++;
+ }
+ fw.write("se\n\techo \"Unknown Mapping!\"\n\texit 1\nfi\n");
+ fw.write("export HADOOP_CLIENT_OPTS=\"-Xmx");
+ fw.write(Long.toString(memory));
+ fw.write("m\"\n");
+ fw.write("export HADOOP_CLASSPATH=");
+ fw.write(classpath);
+ fw.write("\n\"$HADOOP_HOME\"/bin/hadoop ");
+ fw.write(HadoopArchiveLogsRunner.class.getName());
+ fw.write(" -appId \"$appId\" -user \"$user\" -workingDir ");
+ fw.write(workingDir.toString());
+ fw.write(" -remoteRootLogDir ");
+ fw.write(remoteRootLogDir.toString());
+ fw.write(" -suffix ");
+ fw.write(suffix);
+ fw.write("\n");
+ } finally {
+ if (fw != null) {
+ fw.close();
+ }
+ }
+ }
+
+ private boolean runDistributedShell(File localScript) throws Exception {
+ String[] dsArgs = {
+ "--appname",
+ "ArchiveLogs",
+ "--jar",
+ ApplicationMaster.class.getProtectionDomain().getCodeSource()
+ .getLocation().getPath(),
+ "--num_containers",
+ Integer.toString(eligibleApplications.size()),
+ "--container_memory",
+ Long.toString(memory),
+ "--shell_script",
+ localScript.getAbsolutePath()
+ };
+ final Client dsClient = new Client(new Configuration(conf));
+ dsClient.init(dsArgs);
+ return dsClient.run();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ if (conf instanceof JobConf) {
+ this.conf = (JobConf) conf;
+ } else {
+ this.conf = new JobConf(conf, HadoopArchiveLogs.class);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+}
diff --git a/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java
new file mode 100644
index 00000000000..347e5fb48a6
--- /dev/null
+++ b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.File;
+
+/**
+ * This is a child program designed to be used by the {@link HadoopArchiveLogs}
+ * tool via the Distributed Shell. It's not meant to be run directly.
+ */
+public class HadoopArchiveLogsRunner implements Tool {
+ private static final Log LOG = LogFactory.getLog(HadoopArchiveLogsRunner.class);
+
+ private static final String APP_ID_OPTION = "appId";
+ private static final String USER_OPTION = "user";
+ private static final String WORKING_DIR_OPTION = "workingDir";
+ private static final String REMOTE_ROOT_LOG_DIR = "remoteRootLogDir";
+ private static final String SUFFIX_OPTION = "suffix";
+
+ private String appId;
+ private String user;
+ private String workingDir;
+ private String remoteLogDir;
+ private String suffix;
+
+ private JobConf conf;
+
+ public HadoopArchiveLogsRunner(Configuration conf) {
+ setConf(conf);
+ }
+
+ public static void main(String[] args) {
+ JobConf job = new JobConf(HadoopArchiveLogsRunner.class);
+
+ HadoopArchiveLogsRunner halr = new HadoopArchiveLogsRunner(job);
+ int ret = 0;
+
+ try{
+ ret = ToolRunner.run(halr, args);
+ } catch(Exception e) {
+ LOG.debug("Exception", e);
+ System.err.println(e.getClass().getSimpleName());
+ final String s = e.getLocalizedMessage();
+ if (s != null) {
+ System.err.println(s);
+ } else {
+ e.printStackTrace(System.err);
+ }
+ System.exit(1);
+ }
+ System.exit(ret);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ handleOpts(args);
+ String remoteAppLogDir = remoteLogDir + File.separator + user
+ + File.separator + suffix + File.separator + appId;
+
+ // Run 'hadoop archives' command in local mode
+ Configuration haConf = new Configuration(getConf());
+ haConf.set("mapreduce.framework.name", "local");
+ HadoopArchives ha = new HadoopArchives(haConf);
+ String[] haArgs = {
+ "-archiveName",
+ appId + ".har",
+ "-p",
+ remoteAppLogDir,
+ "*",
+ workingDir
+ };
+ StringBuilder sb = new StringBuilder("Executing 'hadoop archives'");
+ for (String haArg : haArgs) {
+ sb.append("\n\t").append(haArg);
+ }
+ LOG.info(sb.toString());
+ ha.run(haArgs);
+
+ FileSystem fs = null;
+ // Move har file to correct location and delete original logs
+ try {
+ fs = FileSystem.get(conf);
+ LOG.info("Moving har to original location");
+ fs.rename(new Path(workingDir, appId + ".har"),
+ new Path(remoteAppLogDir, appId + ".har"));
+ LOG.info("Deleting original logs");
+ for (FileStatus original : fs.listStatus(new Path(remoteAppLogDir),
+ new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return !path.getName().endsWith(".har");
+ }
+ })) {
+ fs.delete(original.getPath(), false);
+ }
+ } finally {
+ if (fs != null) {
+ fs.close();
+ }
+ }
+
+ return 0;
+ }
+
+ private void handleOpts(String[] args) throws ParseException {
+ Options opts = new Options();
+ Option appIdOpt = new Option(APP_ID_OPTION, true, "Application ID");
+ appIdOpt.setRequired(true);
+ Option userOpt = new Option(USER_OPTION, true, "User");
+ userOpt.setRequired(true);
+ Option workingDirOpt = new Option(WORKING_DIR_OPTION, true,
+ "Working Directory");
+ workingDirOpt.setRequired(true);
+ Option remoteLogDirOpt = new Option(REMOTE_ROOT_LOG_DIR, true,
+ "Remote Root Log Directory");
+ remoteLogDirOpt.setRequired(true);
+ Option suffixOpt = new Option(SUFFIX_OPTION, true, "Suffix");
+ suffixOpt.setRequired(true);
+ opts.addOption(appIdOpt);
+ opts.addOption(userOpt);
+ opts.addOption(workingDirOpt);
+ opts.addOption(remoteLogDirOpt);
+ opts.addOption(suffixOpt);
+
+ CommandLineParser parser = new GnuParser();
+ CommandLine commandLine = parser.parse(opts, args);
+ appId = commandLine.getOptionValue(APP_ID_OPTION);
+ user = commandLine.getOptionValue(USER_OPTION);
+ workingDir = commandLine.getOptionValue(WORKING_DIR_OPTION);
+ remoteLogDir = commandLine.getOptionValue(REMOTE_ROOT_LOG_DIR);
+ suffix = commandLine.getOptionValue(SUFFIX_OPTION);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ if (conf instanceof JobConf) {
+ this.conf = (JobConf) conf;
+ } else {
+ this.conf = new JobConf(conf, HadoopArchiveLogsRunner.class);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+}
diff --git a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
new file mode 100644
index 00000000000..c8ff201946d
--- /dev/null
+++ b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+public class TestHadoopArchiveLogs {
+
+ private static final long CLUSTER_TIMESTAMP = System.currentTimeMillis();
+ private static final int FILE_SIZE_INCREMENT = 4096;
+ private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT];
+ static {
+ new Random().nextBytes(DUMMY_DATA);
+ }
+
+ @Test(timeout = 10000)
+ public void testCheckFiles() throws Exception {
+ Configuration conf = new Configuration();
+ HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path rootLogDir = new Path("target", "logs");
+ String suffix = "logs";
+ Path logDir = new Path(rootLogDir,
+ new Path(System.getProperty("user.name"), suffix));
+ fs.mkdirs(logDir);
+
+ Assert.assertEquals(0, hal.eligibleApplications.size());
+ ApplicationReport app1 = createAppReport(1); // no files found
+ ApplicationReport app2 = createAppReport(2); // too few files
+ Path app2Path = new Path(logDir, app2.getApplicationId().toString());
+ fs.mkdirs(app2Path);
+ createFile(fs, new Path(app2Path, "file1"), 1);
+ hal.minNumLogFiles = 2;
+ ApplicationReport app3 = createAppReport(3); // too large
+ Path app3Path = new Path(logDir, app3.getApplicationId().toString());
+ fs.mkdirs(app3Path);
+ createFile(fs, new Path(app3Path, "file1"), 2);
+ createFile(fs, new Path(app3Path, "file2"), 5);
+ hal.maxTotalLogsSize = FILE_SIZE_INCREMENT * 6;
+ ApplicationReport app4 = createAppReport(4); // has har already
+ Path app4Path = new Path(logDir, app4.getApplicationId().toString());
+ fs.mkdirs(app4Path);
+ createFile(fs, new Path(app4Path, app4.getApplicationId() + ".har"), 1);
+ ApplicationReport app5 = createAppReport(5); // just right
+ Path app5Path = new Path(logDir, app5.getApplicationId().toString());
+ fs.mkdirs(app5Path);
+ createFile(fs, new Path(app5Path, "file1"), 2);
+ createFile(fs, new Path(app5Path, "file2"), 3);
+ hal.eligibleApplications.add(app1);
+ hal.eligibleApplications.add(app2);
+ hal.eligibleApplications.add(app3);
+ hal.eligibleApplications.add(app4);
+ hal.eligibleApplications.add(app5);
+
+ hal.checkFiles(fs, rootLogDir, suffix);
+ Assert.assertEquals(1, hal.eligibleApplications.size());
+ Assert.assertEquals(app5, hal.eligibleApplications.iterator().next());
+ }
+
+ @Test(timeout = 10000)
+ public void testCheckMaxEligible() throws Exception {
+ Configuration conf = new Configuration();
+ HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
+ ApplicationReport app1 = createAppReport(1);
+ app1.setFinishTime(CLUSTER_TIMESTAMP - 5);
+ ApplicationReport app2 = createAppReport(2);
+ app2.setFinishTime(CLUSTER_TIMESTAMP - 10);
+ ApplicationReport app3 = createAppReport(3);
+ app3.setFinishTime(CLUSTER_TIMESTAMP + 5);
+ ApplicationReport app4 = createAppReport(4);
+ app4.setFinishTime(CLUSTER_TIMESTAMP + 10);
+ ApplicationReport app5 = createAppReport(5);
+ app5.setFinishTime(CLUSTER_TIMESTAMP);
+ Assert.assertEquals(0, hal.eligibleApplications.size());
+ hal.eligibleApplications.add(app1);
+ hal.eligibleApplications.add(app2);
+ hal.eligibleApplications.add(app3);
+ hal.eligibleApplications.add(app4);
+ hal.eligibleApplications.add(app5);
+ hal.maxEligible = -1;
+ hal.checkMaxEligible();
+ Assert.assertEquals(5, hal.eligibleApplications.size());
+
+ hal.maxEligible = 4;
+ hal.checkMaxEligible();
+ Assert.assertEquals(4, hal.eligibleApplications.size());
+ Assert.assertFalse(hal.eligibleApplications.contains(app4));
+
+ hal.maxEligible = 3;
+ hal.checkMaxEligible();
+ Assert.assertEquals(3, hal.eligibleApplications.size());
+ Assert.assertFalse(hal.eligibleApplications.contains(app3));
+
+ hal.maxEligible = 2;
+ hal.checkMaxEligible();
+ Assert.assertEquals(2, hal.eligibleApplications.size());
+ Assert.assertFalse(hal.eligibleApplications.contains(app5));
+
+ hal.maxEligible = 1;
+ hal.checkMaxEligible();
+ Assert.assertEquals(1, hal.eligibleApplications.size());
+ Assert.assertFalse(hal.eligibleApplications.contains(app1));
+ }
+
+ @Test(timeout = 10000)
+ public void testFindAggregatedApps() throws Exception {
+ MiniYARNCluster yarnCluster = null;
+ try {
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+ yarnCluster =
+ new MiniYARNCluster(TestHadoopArchiveLogs.class.getSimpleName(), 1,
+ 1, 1, 1);
+ yarnCluster.init(conf);
+ yarnCluster.start();
+ conf = yarnCluster.getConfig();
+
+ RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
+ RMAppImpl app1 = (RMAppImpl)createRMApp(1, conf, rmContext,
+ LogAggregationStatus.DISABLED);
+ RMAppImpl app2 = (RMAppImpl)createRMApp(2, conf, rmContext,
+ LogAggregationStatus.FAILED);
+ RMAppImpl app3 = (RMAppImpl)createRMApp(3, conf, rmContext,
+ LogAggregationStatus.NOT_START);
+ RMAppImpl app4 = (RMAppImpl)createRMApp(4, conf, rmContext,
+ LogAggregationStatus.SUCCEEDED);
+ RMAppImpl app5 = (RMAppImpl)createRMApp(5, conf, rmContext,
+ LogAggregationStatus.RUNNING);
+ RMAppImpl app6 = (RMAppImpl)createRMApp(6, conf, rmContext,
+ LogAggregationStatus.RUNNING_WITH_FAILURE);
+ RMAppImpl app7 = (RMAppImpl)createRMApp(7, conf, rmContext,
+ LogAggregationStatus.TIME_OUT);
+ rmContext.getRMApps().put(app1.getApplicationId(), app1);
+ rmContext.getRMApps().put(app2.getApplicationId(), app2);
+ rmContext.getRMApps().put(app3.getApplicationId(), app3);
+ rmContext.getRMApps().put(app4.getApplicationId(), app4);
+ rmContext.getRMApps().put(app5.getApplicationId(), app5);
+ rmContext.getRMApps().put(app6.getApplicationId(), app6);
+ rmContext.getRMApps().put(app7.getApplicationId(), app7);
+
+ HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
+ Assert.assertEquals(0, hal.eligibleApplications.size());
+ hal.findAggregatedApps();
+ Assert.assertEquals(2, hal.eligibleApplications.size());
+ } finally {
+ if (yarnCluster != null) {
+ yarnCluster.stop();
+ }
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testGenerateScript() throws Exception {
+ Configuration conf = new Configuration();
+ HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
+ ApplicationReport app1 = createAppReport(1);
+ ApplicationReport app2 = createAppReport(2);
+ hal.eligibleApplications.add(app1);
+ hal.eligibleApplications.add(app2);
+
+ File localScript = new File("target", "script.sh");
+ Path workingDir = new Path("/tmp", "working");
+ Path remoteRootLogDir = new Path("/tmp", "logs");
+ String suffix = "logs";
+ localScript.delete();
+ Assert.assertFalse(localScript.exists());
+ hal.generateScript(localScript, workingDir, remoteRootLogDir, suffix);
+ Assert.assertTrue(localScript.exists());
+ String script = IOUtils.toString(localScript.toURI());
+ String[] lines = script.split(System.lineSeparator());
+ Assert.assertEquals(16, lines.length);
+ Assert.assertEquals("#!/bin/bash", lines[0]);
+ Assert.assertEquals("set -e", lines[1]);
+ Assert.assertEquals("set -x", lines[2]);
+ Assert.assertEquals("if [ \"$YARN_SHELL_ID\" == \"1\" ]; then", lines[3]);
+ if (lines[4].contains(app1.getApplicationId().toString())) {
+ Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString()
+ + "\"", lines[4]);
+ Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString()
+ + "\"", lines[7]);
+ } else {
+ Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString()
+ + "\"", lines[4]);
+ Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString()
+ + "\"", lines[7]);
+ }
+ Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"",
+ lines[5]);
+ Assert.assertEquals("elif [ \"$YARN_SHELL_ID\" == \"2\" ]; then", lines[6]);
+ Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"",
+ lines[8]);
+ Assert.assertEquals("else", lines[9]);
+ Assert.assertEquals("\techo \"Unknown Mapping!\"", lines[10]);
+ Assert.assertEquals("\texit 1", lines[11]);
+ Assert.assertEquals("fi", lines[12]);
+ Assert.assertEquals("export HADOOP_CLIENT_OPTS=\"-Xmx1024m\"", lines[13]);
+ Assert.assertTrue(lines[14].startsWith("export HADOOP_CLASSPATH="));
+ Assert.assertEquals("\"$HADOOP_HOME\"/bin/hadoop org.apache.hadoop.tools." +
+ "HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" -workingDir "
+ + workingDir.toString() + " -remoteRootLogDir " +
+ remoteRootLogDir.toString() + " -suffix " + suffix, lines[15]);
+ }
+
+ private static ApplicationReport createAppReport(int id) {
+ ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id);
+ return ApplicationReport.newInstance(
+ appId,
+ ApplicationAttemptId.newInstance(appId, 1),
+ System.getProperty("user.name"),
+ null, null, null, 0, null, YarnApplicationState.FINISHED, null,
+ null, 0L, 0L, FinalApplicationStatus.SUCCEEDED, null, null, 100f,
+ null, null);
+ }
+
+ private static void createFile(FileSystem fs, Path p, long sizeMultiple)
+ throws IOException {
+ FSDataOutputStream out = null;
+ try {
+ out = fs.create(p);
+ for (int i = 0 ; i < sizeMultiple; i++) {
+ out.write(DUMMY_DATA);
+ }
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
+ private static RMApp createRMApp(int id, Configuration conf, RMContext rmContext,
+ final LogAggregationStatus aggStatus) {
+ ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id);
+ ApplicationSubmissionContext submissionContext =
+ ApplicationSubmissionContext.newInstance(appId, "test", "default",
+ Priority.newInstance(0), null, false, true,
+ 2, Resource.newInstance(10, 2), "test");
+ return new RMAppImpl(appId, rmContext, conf, "test",
+ System.getProperty("user.name"), "default", submissionContext,
+ rmContext.getScheduler(),
+ rmContext.getApplicationMasterService(),
+ System.currentTimeMillis(), "test",
+ null, null) {
+ @Override
+ public ApplicationReport createAndGetApplicationReport(
+ String clientUserName, boolean allowAccess) {
+ ApplicationReport report =
+ super.createAndGetApplicationReport(clientUserName, allowAccess);
+ report.setLogAggregationStatus(aggStatus);
+ return report;
+ }
+ };
+ }
+}
diff --git a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java
new file mode 100644
index 00000000000..af66f14bc81
--- /dev/null
+++ b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.HarFs;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestHadoopArchiveLogsRunner {
+
+ private static final int FILE_SIZE_INCREMENT = 4096;
+ private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT];
+ static {
+ new Random().nextBytes(DUMMY_DATA);
+ }
+
+ @Test(timeout = 30000)
+ public void testHadoopArchiveLogs() throws Exception {
+ MiniYARNCluster yarnCluster = null;
+ MiniDFSCluster dfsCluster = null;
+ FileSystem fs = null;
+ try {
+ Configuration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+ yarnCluster =
+ new MiniYARNCluster(TestHadoopArchiveLogsRunner.class.getSimpleName(),
+ 1, 2, 1, 1);
+ yarnCluster.init(conf);
+ yarnCluster.start();
+ conf = yarnCluster.getConfig();
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+
+ ApplicationId app1 =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ fs = FileSystem.get(conf);
+ Path remoteRootLogDir = new Path(conf.get(
+ YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+ Path workingDir = new Path(remoteRootLogDir, "archive-logs-work");
+ String suffix = "logs";
+ Path logDir = new Path(remoteRootLogDir,
+ new Path(System.getProperty("user.name"), suffix));
+ fs.mkdirs(logDir);
+ Path app1Path = new Path(logDir, app1.toString());
+ fs.mkdirs(app1Path);
+ createFile(fs, new Path(app1Path, "log1"), 3);
+ createFile(fs, new Path(app1Path, "log2"), 4);
+ createFile(fs, new Path(app1Path, "log3"), 2);
+ FileStatus[] app1Files = fs.listStatus(app1Path);
+ Assert.assertEquals(3, app1Files.length);
+
+ String[] args = new String[]{
+ "-appId", app1.toString(),
+ "-user", System.getProperty("user.name"),
+ "-workingDir", workingDir.toString(),
+ "-remoteRootLogDir", remoteRootLogDir.toString(),
+ "-suffix", suffix};
+ final HadoopArchiveLogsRunner halr = new HadoopArchiveLogsRunner(conf);
+ assertEquals(0, ToolRunner.run(halr, args));
+
+ fs = FileSystem.get(conf);
+ app1Files = fs.listStatus(app1Path);
+ Assert.assertEquals(1, app1Files.length);
+ FileStatus harFile = app1Files[0];
+ Assert.assertEquals(app1.toString() + ".har", harFile.getPath().getName());
+ Path harPath = new Path("har:///" + harFile.getPath().toUri().getRawPath());
+ FileStatus[] harLogs = HarFs.get(harPath.toUri(), conf).listStatus(harPath);
+ Assert.assertEquals(3, harLogs.length);
+ Arrays.sort(harLogs, new Comparator() {
+ @Override
+ public int compare(FileStatus o1, FileStatus o2) {
+ return o1.getPath().getName().compareTo(o2.getPath().getName());
+ }
+ });
+ Assert.assertEquals("log1", harLogs[0].getPath().getName());
+ Assert.assertEquals(3 * FILE_SIZE_INCREMENT, harLogs[0].getLen());
+ Assert.assertEquals("log2", harLogs[1].getPath().getName());
+ Assert.assertEquals(4 * FILE_SIZE_INCREMENT, harLogs[1].getLen());
+ Assert.assertEquals("log3", harLogs[2].getPath().getName());
+ Assert.assertEquals(2 * FILE_SIZE_INCREMENT, harLogs[2].getLen());
+ Assert.assertEquals(0, fs.listStatus(workingDir).length);
+ } finally {
+ if (yarnCluster != null) {
+ yarnCluster.stop();
+ }
+ if (fs != null) {
+ fs.close();
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
+ }
+ }
+
+ private static void createFile(FileSystem fs, Path p, long sizeMultiple)
+ throws IOException {
+ FSDataOutputStream out = null;
+ try {
+ out = fs.create(p);
+ for (int i = 0 ; i < sizeMultiple; i++) {
+ out.write(DUMMY_DATA);
+ }
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-tools-dist/pom.xml b/hadoop-tools/hadoop-tools-dist/pom.xml
index 635d6ca60f6..f19f313aa9c 100644
--- a/hadoop-tools/hadoop-tools-dist/pom.xml
+++ b/hadoop-tools/hadoop-tools-dist/pom.xml
@@ -50,6 +50,11 @@
hadoop-archives
compile
+
+ org.apache.hadoop
+ hadoop-archive-logs
+ compile
+
org.apache.hadoop
hadoop-rumen
diff --git a/hadoop-tools/pom.xml b/hadoop-tools/pom.xml
index 77b442adbf0..f85b1d9468d 100644
--- a/hadoop-tools/pom.xml
+++ b/hadoop-tools/pom.xml
@@ -34,6 +34,7 @@
hadoop-streaming
hadoop-distcp
hadoop-archives
+ hadoop-archive-logs
hadoop-rumen
hadoop-gridmix
hadoop-datajoin