diff --git a/hadoop-mapreduce-project/bin/mapred b/hadoop-mapreduce-project/bin/mapred index f66f563aaef..ce9ce217aee 100755 --- a/hadoop-mapreduce-project/bin/mapred +++ b/hadoop-mapreduce-project/bin/mapred @@ -32,6 +32,7 @@ function hadoop_usage hadoop_add_subcommand "pipes" client "run a Pipes job" hadoop_add_subcommand "queue" client "get information regarding JobQueues" hadoop_add_subcommand "sampler" client "sampler" + hadoop_add_subcommand "frameworkuploader" admin "mapreduce framework upload" hadoop_add_subcommand "version" client "print the version" hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" true } @@ -92,6 +93,9 @@ function mapredcmd_case sampler) HADOOP_CLASSNAME=org.apache.hadoop.mapred.lib.InputSampler ;; + frameworkuploader) + HADOOP_CLASSNAME=org.apache.hadoop.mapred.uploader.FrameworkUploader + ;; version) HADOOP_CLASSNAME=org.apache.hadoop.util.VersionInfo ;; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/pom.xml new file mode 100644 index 00000000000..a7214049298 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/pom.xml @@ -0,0 +1,67 @@ + + + + + hadoop-mapreduce-client + org.apache.hadoop + 3.1.0-SNAPSHOT + + 4.0.0 + hadoop-mapreduce-client-uploader + 3.1.0-SNAPSHOT + Apache Hadoop MapReduce Uploader + + + + commons-cli + commons-cli + + + org.apache.commons + commons-compress + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-hdfs-client + + + + + ${project.parent.basedir}/../ + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + org.apache.hadoop.mapred.uploader.FrameworkUploader + + + + + + + + \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/DefaultJars.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/DefaultJars.java new file mode 100644 index 00000000000..49ee64f0028 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/DefaultJars.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.uploader; + +/** + * Default white list and black list implementations. + */ +final class DefaultJars { + static final String DEFAULT_EXCLUDED_MR_JARS = + ".*hadoop-yarn-server-applicationhistoryservice.*\\.jar," + + ".*hadoop-yarn-server-nodemanager.*\\.jar," + + ".*hadoop-yarn-server-resourcemanager.*\\.jar," + + ".*hadoop-yarn-server-router.*\\.jar," + + ".*hadoop-yarn-server-sharedcachemanager.*\\.jar," + + ".*hadoop-yarn-server-timeline-pluginstorage.*\\.jar," + + ".*hadoop-yarn-server-timelineservice.*\\.jar," + + ".*hadoop-yarn-server-timelineservice-hbase.*\\.jar,"; + + static final String DEFAULT_MR_JARS = + "$HADOOP_HOME/share/hadoop/common/.*\\.jar," + + "$HADOOP_HOME/share/hadoop/common/lib/.*\\.jar," + + "$HADOOP_HOME/share/hadoop/hdfs/.*\\.jar," + + "$HADOOP_HOME/share/hadoop/hdfs/lib/.*\\.jar," + + "$HADOOP_HOME/share/hadoop/mapreduce/.*\\.jar," + + "$HADOOP_HOME/share/hadoop/mapreduce/lib/.*\\.jar," + + "$HADOOP_HOME/share/hadoop/yarn/.*\\.jar," + + "$HADOOP_HOME/share/hadoop/yarn/lib/.*\\.jar,"; + + private DefaultJars() {} +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java new file mode 100644 index 00000000000..d1cd7401a50 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/FrameworkUploader.java @@ -0,0 +1,384 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.uploader; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.compress.archivers.ArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.zip.GZIPOutputStream; + +/** + * Upload a MapReduce framework tarball to HDFS. + * Usage: + * sudo -u mapred mapred frameworkuploader -fs hdfs://`hostname`:8020 -target + * /tmp/upload.tar.gz#mr-framework +*/ +public class FrameworkUploader implements Runnable { + private static final Pattern VAR_SUBBER = + Pattern.compile(Shell.getEnvironmentVariableRegex()); + private static final Logger LOG = + LoggerFactory.getLogger(FrameworkUploader.class); + + @VisibleForTesting + String input = null; + @VisibleForTesting + String whitelist = null; + @VisibleForTesting + String blacklist = null; + @VisibleForTesting + String target = null; + @VisibleForTesting + short replication = 10; + + @VisibleForTesting + Set filteredInputFiles = new HashSet<>(); + @VisibleForTesting + List whitelistedFiles = new LinkedList<>(); + @VisibleForTesting + List blacklistedFiles = new LinkedList<>(); + + @VisibleForTesting + OutputStream targetStream = null; + private Path targetPath = null; + private String alias = null; + + private void printHelp(Options options) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("mapred frameworkuploader", options); + } + + public void run() { + try { + collectPackages(); + buildPackage(); + LOG.info("Uploaded " + target); + System.out.println("Suggested mapreduce.application.framework.path " + + target); + LOG.info( + "Suggested mapreduce.application.classpath $PWD/" + alias + "/*"); + System.out.println("Suggested classpath $PWD/" + alias + "/*"); + } catch (UploaderException|IOException e) { + LOG.error("Error in execution " + e.getMessage()); + e.printStackTrace(); + } + } + + @VisibleForTesting + void collectPackages() throws UploaderException { + parseLists(); + String[] list = StringUtils.split(input, File.pathSeparatorChar); + for (String item : list) { + LOG.info("Original source " + item); + String expanded = expandEnvironmentVariables(item, System.getenv()); + LOG.info("Expanded source " + expanded); + if (expanded.endsWith("*")) { + File path = new File(expanded.substring(0, expanded.length() - 1)); + if (path.isDirectory()) { + File[] files = path.listFiles(); + if (files != null) { + for (File jar : files) { + if (!jar.isDirectory()) { + addJar(jar); + } else { + LOG.info("Ignored " + jar + " because it is a directory"); + } + } + } else { + LOG.warn("Could not list directory " + path); + } + } else { + LOG.warn("Ignored " + expanded + ". It is not a directory"); + } + } else if (expanded.endsWith(".jar")) { + File jarFile = new File(expanded); + addJar(jarFile); + } else if (!expanded.isEmpty()) { + LOG.warn("Ignored " + expanded + " only jars are supported"); + } + } + } + + private void beginUpload() throws IOException, UploaderException { + if (targetStream == null) { + validateTargetPath(); + int lastIndex = target.indexOf('#'); + targetPath = + new Path( + target.substring( + 0, lastIndex == -1 ? target.length() : lastIndex)); + alias = lastIndex != -1 ? + target.substring(lastIndex + 1) : + targetPath.getName(); + LOG.info("Target " + targetPath); + FileSystem fileSystem = targetPath.getFileSystem(new Configuration()); + targetStream = fileSystem.create(targetPath, true); + } + } + + @VisibleForTesting + void buildPackage() throws IOException, UploaderException { + beginUpload(); + LOG.info("Compressing tarball"); + try (TarArchiveOutputStream out = new TarArchiveOutputStream( + new GZIPOutputStream(targetStream))) { + for (String fullPath : filteredInputFiles) { + LOG.info("Adding " + fullPath); + File file = new File(fullPath); + try (FileInputStream inputStream = new FileInputStream(file)) { + ArchiveEntry entry = out.createArchiveEntry(file, file.getName()); + out.putArchiveEntry(entry); + IOUtils.copyBytes(inputStream, out, 1024 * 1024); + out.closeArchiveEntry(); + } + } + } finally { + if (targetStream != null) { + targetStream.close(); + } + } + + if (targetPath == null) { + return; + } + + // Set file attributes + FileSystem fileSystem = targetPath.getFileSystem(new Configuration()); + if (fileSystem instanceof DistributedFileSystem) { + LOG.info("Disabling Erasure Coding for path: " + targetPath); + DistributedFileSystem dfs = (DistributedFileSystem) fileSystem; + dfs.setErasureCodingPolicy(targetPath, + SystemErasureCodingPolicies.getReplicationPolicy().getName()); + } + + if (replication > 0) { + LOG.info("Set replication to " + + replication + " for path: " + targetPath); + fileSystem.setReplication(targetPath, replication); + } + } + + private void parseLists() throws UploaderException { + Map env = System.getenv(); + for(Map.Entry item : env.entrySet()) { + LOG.info("Environment " + item.getKey() + " " + item.getValue()); + } + String[] whiteListItems = StringUtils.split(whitelist); + for (String pattern : whiteListItems) { + String expandedPattern = + expandEnvironmentVariables(pattern, env); + Pattern compiledPattern = + Pattern.compile("^" + expandedPattern + "$"); + LOG.info("Whitelisted " + compiledPattern.toString()); + whitelistedFiles.add(compiledPattern); + } + String[] blacklistItems = StringUtils.split(blacklist); + for (String pattern : blacklistItems) { + String expandedPattern = + expandEnvironmentVariables(pattern, env); + Pattern compiledPattern = + Pattern.compile("^" + expandedPattern + "$"); + LOG.info("Blacklisted " + compiledPattern.toString()); + blacklistedFiles.add(compiledPattern); + } + } + + @VisibleForTesting + String expandEnvironmentVariables(String innerInput, Map env) + throws UploaderException { + boolean found; + do { + found = false; + Matcher matcher = VAR_SUBBER.matcher(innerInput); + StringBuffer stringBuffer = new StringBuffer(); + while (matcher.find()) { + found = true; + String var = matcher.group(1); + // replace $env with the child's env constructed by tt's + String replace = env.get(var); + // the env key is not present anywhere .. simply set it + if (replace == null) { + throw new UploaderException("Environment variable does not exist " + + var); + } + matcher.appendReplacement( + stringBuffer, Matcher.quoteReplacement(replace)); + } + matcher.appendTail(stringBuffer); + innerInput = stringBuffer.toString(); + } while (found); + return innerInput; + } + + private void addJar(File jar) throws UploaderException{ + boolean found = false; + if (!jar.getName().endsWith(".jar")) { + LOG.info("Ignored non-jar " + jar.getAbsolutePath()); + } + for (Pattern pattern : whitelistedFiles) { + Matcher matcher = pattern.matcher(jar.getAbsolutePath()); + if (matcher.matches()) { + LOG.info("Whitelisted " + jar.getAbsolutePath()); + found = true; + break; + } + } + boolean excluded = false; + for (Pattern pattern : blacklistedFiles) { + Matcher matcher = pattern.matcher(jar.getAbsolutePath()); + if (matcher.matches()) { + LOG.info("Blacklisted " + jar.getAbsolutePath()); + excluded = true; + break; + } + } + if (found && !excluded) { + LOG.info("Whitelisted " + jar.getAbsolutePath()); + if (!filteredInputFiles.add(jar.getAbsolutePath())) { + throw new UploaderException("Duplicate jar" + jar.getAbsolutePath()); + } + } + if (!found) { + LOG.info("Ignored " + jar.getAbsolutePath() + " because it is missing " + + "from the whitelist"); + } else if (excluded) { + LOG.info("Ignored " + jar.getAbsolutePath() + " because it is on " + + "the the blacklist"); + } + } + + private void validateTargetPath() throws UploaderException { + if (!target.startsWith("hdfs:/") && + !target.startsWith("file:/")) { + throw new UploaderException("Target path is not hdfs or local " + target); + } + } + + @VisibleForTesting + boolean parseArguments(String[] args) throws IOException { + Options opts = new Options(); + opts.addOption(OptionBuilder.create("h")); + opts.addOption(OptionBuilder.create("help")); + opts.addOption(OptionBuilder + .withDescription("Input class path") + .hasArg().create("input")); + opts.addOption(OptionBuilder + .withDescription( + "Regex specifying the full path of jars to include in the" + + " framework tarball. Default is a hardcoded set of jars" + + " considered necessary to include") + .hasArg().create("whitelist")); + opts.addOption(OptionBuilder + .withDescription( + "Regex specifying the full path of jars to exclude in the" + + " framework tarball. Default is a hardcoded set of jars" + + " considered unnecessary to include") + .hasArg().create("blacklist")); + opts.addOption(OptionBuilder + .withDescription( + "Target file system to upload to." + + " Example: hdfs://foo.com:8020") + .hasArg().create("fs")); + opts.addOption(OptionBuilder + .withDescription( + "Target file to upload to with a reference name." + + " Example: /usr/mr-framework.tar.gz#mr-framework") + .hasArg().create("target")); + opts.addOption(OptionBuilder + .withDescription( + "Desired replication count") + .hasArg().create("replication")); + GenericOptionsParser parser = new GenericOptionsParser(opts, args); + if (parser.getCommandLine().hasOption("help") || + parser.getCommandLine().hasOption("h")) { + printHelp(opts); + return false; + } + input = parser.getCommandLine().getOptionValue( + "input", System.getProperty("java.class.path")); + whitelist = parser.getCommandLine().getOptionValue( + "whitelist", DefaultJars.DEFAULT_MR_JARS); + blacklist = parser.getCommandLine().getOptionValue( + "blacklist", DefaultJars.DEFAULT_EXCLUDED_MR_JARS); + replication = Short.parseShort(parser.getCommandLine().getOptionValue( + "replication", "10")); + String fs = parser.getCommandLine() + .getOptionValue("fs", null); + if (fs == null) { + LOG.error("Target file system not specified"); + printHelp(opts); + return false; + } + String path = parser.getCommandLine().getOptionValue("target", + "mr-framework.tar.gz#mr-framework"); + if (path == null) { + LOG.error("Target directory not specified"); + printHelp(opts); + return false; + } + StringBuilder absolutePath = new StringBuilder(fs); + absolutePath = absolutePath.append(path.startsWith("/") ? "" : "/"); + absolutePath.append(path); + target = absolutePath.toString(); + + if (parser.getRemainingArgs().length > 0) { + LOG.warn("Unexpected parameters"); + printHelp(opts); + return false; + } + return true; + } + + /** + * Tool entry point. + * @param args arguments + * @throws IOException thrown on configuration errors + */ + public static void main(String[] args) throws IOException { + FrameworkUploader uploader = new FrameworkUploader(); + if(uploader.parseArguments(args)) { + uploader.run(); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/UploaderException.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/UploaderException.java new file mode 100644 index 00000000000..73f6454019d --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/UploaderException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.uploader; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Framework uploaded exception type. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +class UploaderException extends Exception { + + private static final long serialVersionUID = 1L; + + UploaderException(String message) { + super(message); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/package-info.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/package-info.java new file mode 100644 index 00000000000..4475e8eca46 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/main/java/org/apache/hadoop/mapred/uploader/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.mapred.uploader contains classes related to the + * MapReduce framework upload tool. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +package org.apache.hadoop.mapred.uploader; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java new file mode 100644 index 00000000000..9d03165a4e8 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-uploader/src/test/java/org/apache/hadoop/mapred/uploader/TestFrameworkUploader.java @@ -0,0 +1,315 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.uploader; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.io.FileUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.zip.GZIPInputStream; + +/** + * Unit test class for FrameworkUploader. + */ +public class TestFrameworkUploader { + private static String testDir; + + @Before + public void setUp() { + String testRootDir = + new File(System.getProperty("test.build.data", "/tmp")) + .getAbsolutePath() + .replace(' ', '+'); + Random random = new Random(System.currentTimeMillis()); + testDir = testRootDir + File.separatorChar + + Long.toString(random.nextLong()); + } + + /** + * Test requesting command line help. + * @throws IOException test failure + */ + @Test + public void testHelp() throws IOException { + String[] args = new String[]{"-help"}; + FrameworkUploader uploader = new FrameworkUploader(); + boolean success = uploader.parseArguments(args); + Assert.assertFalse("Expected to print help", success); + Assert.assertEquals("Expected ignore run", null, + uploader.input); + Assert.assertEquals("Expected ignore run", null, + uploader.whitelist); + Assert.assertEquals("Expected ignore run", null, + uploader.target); + } + + /** + * Test invalid argument parsing. + * @throws IOException test failure + */ + @Test + public void testWrongArgument() throws IOException { + String[] args = new String[]{"-unexpected"}; + FrameworkUploader uploader = new FrameworkUploader(); + boolean success = uploader.parseArguments(args); + Assert.assertFalse("Expected to print help", success); + } + + /** + * Test normal argument passing. + * @throws IOException test failure + */ + @Test + public void testArguments() throws IOException { + String[] args = + new String[]{ + "-input", "A", + "-whitelist", "B", + "-blacklist", "C", + "-fs", "hdfs://C:8020", + "-target", "D", + "-replication", "100"}; + FrameworkUploader uploader = new FrameworkUploader(); + boolean success = uploader.parseArguments(args); + Assert.assertTrue("Expected to print help", success); + Assert.assertEquals("Input mismatch", "A", + uploader.input); + Assert.assertEquals("Whitelist mismatch", "B", + uploader.whitelist); + Assert.assertEquals("Blacklist mismatch", "C", + uploader.blacklist); + Assert.assertEquals("Target mismatch", "hdfs://C:8020/D", + uploader.target); + Assert.assertEquals("Replication mismatch", 100, + uploader.replication); + } + + /** + * Test whether we can filter a class path properly. + * @throws IOException test failure + */ + @Test + public void testCollectPackages() throws IOException, UploaderException { + File parent = new File(testDir); + try { + parent.deleteOnExit(); + Assert.assertTrue("Directory creation failed", parent.mkdirs()); + File dirA = new File(parent, "A"); + Assert.assertTrue(dirA.mkdirs()); + File dirB = new File(parent, "B"); + Assert.assertTrue(dirB.mkdirs()); + File jarA = new File(dirA, "a.jar"); + Assert.assertTrue(jarA.createNewFile()); + File jarB = new File(dirA, "b.jar"); + Assert.assertTrue(jarB.createNewFile()); + File jarC = new File(dirA, "c.jar"); + Assert.assertTrue(jarC.createNewFile()); + File txtD = new File(dirA, "d.txt"); + Assert.assertTrue(txtD.createNewFile()); + File jarD = new File(dirB, "d.jar"); + Assert.assertTrue(jarD.createNewFile()); + File txtE = new File(dirB, "e.txt"); + Assert.assertTrue(txtE.createNewFile()); + + FrameworkUploader uploader = new FrameworkUploader(); + uploader.whitelist = ".*a\\.jar,.*b\\.jar,.*d\\.jar"; + uploader.blacklist = ".*b\\.jar"; + uploader.input = dirA.getAbsolutePath() + File.separatorChar + "*" + + File.pathSeparatorChar + + dirB.getAbsolutePath() + File.separatorChar + "*"; + uploader.collectPackages(); + Assert.assertEquals("Whitelist count error", 3, + uploader.whitelistedFiles.size()); + Assert.assertEquals("Blacklist count error", 1, + uploader.blacklistedFiles.size()); + + Assert.assertTrue("File not collected", + uploader.filteredInputFiles.contains(jarA.getAbsolutePath())); + Assert.assertFalse("File collected", + uploader.filteredInputFiles.contains(jarB.getAbsolutePath())); + Assert.assertTrue("File not collected", + uploader.filteredInputFiles.contains(jarD.getAbsolutePath())); + Assert.assertEquals("Too many whitelists", 2, + uploader.filteredInputFiles.size()); + } finally { + FileUtils.deleteDirectory(parent); + } + } + + /** + * Test building a tarball from source jars. + */ + @Test + public void testBuildTarBall() throws IOException, UploaderException { + File parent = new File(testDir); + try { + parent.deleteOnExit(); + FrameworkUploader uploader = prepareTree(parent); + + File gzipFile = new File("upload.tar.gz"); + gzipFile.deleteOnExit(); + Assert.assertTrue("Creating output", gzipFile.createNewFile()); + uploader.targetStream = new FileOutputStream(gzipFile); + + uploader.buildPackage(); + + TarArchiveInputStream result = null; + try { + result = + new TarArchiveInputStream( + new GZIPInputStream(new FileInputStream(gzipFile))); + Set fileNames = new HashSet<>(); + Set sizes = new HashSet<>(); + TarArchiveEntry entry1 = result.getNextTarEntry(); + fileNames.add(entry1.getName()); + sizes.add(entry1.getSize()); + TarArchiveEntry entry2 = result.getNextTarEntry(); + fileNames.add(entry2.getName()); + sizes.add(entry2.getSize()); + Assert.assertTrue( + "File name error", fileNames.contains("a.jar")); + Assert.assertTrue( + "File size error", sizes.contains((long) 13)); + Assert.assertTrue( + "File name error", fileNames.contains("b.jar")); + Assert.assertTrue( + "File size error", sizes.contains((long) 14)); + } finally { + if (result != null) { + result.close(); + } + } + } finally { + FileUtils.deleteDirectory(parent); + } + } + + /** + * Test upload to HDFS. + */ + @Test + public void testUpload() throws IOException, UploaderException { + final String fileName = "/upload.tar.gz"; + File parent = new File(testDir); + try { + parent.deleteOnExit(); + + FrameworkUploader uploader = prepareTree(parent); + + uploader.target = "file://" + parent.getAbsolutePath() + fileName; + + uploader.buildPackage(); + try (TarArchiveInputStream archiveInputStream = new TarArchiveInputStream( + new GZIPInputStream( + new FileInputStream( + parent.getAbsolutePath() + fileName)))) { + Set fileNames = new HashSet<>(); + Set sizes = new HashSet<>(); + TarArchiveEntry entry1 = archiveInputStream.getNextTarEntry(); + fileNames.add(entry1.getName()); + sizes.add(entry1.getSize()); + TarArchiveEntry entry2 = archiveInputStream.getNextTarEntry(); + fileNames.add(entry2.getName()); + sizes.add(entry2.getSize()); + Assert.assertTrue( + "File name error", fileNames.contains("a.jar")); + Assert.assertTrue( + "File size error", sizes.contains((long) 13)); + Assert.assertTrue( + "File name error", fileNames.contains("b.jar")); + Assert.assertTrue( + "File size error", sizes.contains((long) 14)); + } + } finally { + FileUtils.deleteDirectory(parent); + } + } + + /** + * Prepare a mock directory tree to compress and upload. + */ + private FrameworkUploader prepareTree(File parent) + throws FileNotFoundException { + Assert.assertTrue(parent.mkdirs()); + File dirA = new File(parent, "A"); + Assert.assertTrue(dirA.mkdirs()); + File jarA = new File(parent, "a.jar"); + PrintStream printStream = new PrintStream(new FileOutputStream(jarA)); + printStream.println("Hello World!"); + printStream.close(); + File jarB = new File(dirA, "b.jar"); + printStream = new PrintStream(new FileOutputStream(jarB)); + printStream.println("Hello Galaxy!"); + printStream.close(); + + FrameworkUploader uploader = new FrameworkUploader(); + uploader.filteredInputFiles.add(jarA.getAbsolutePath()); + uploader.filteredInputFiles.add(jarB.getAbsolutePath()); + + return uploader; + } + + /** + * Test regex pattern matching and environment variable replacement. + */ + @Test + public void testEnvironmentReplacement() throws UploaderException { + String input = "C/$A/B,$B,D"; + Map map = new HashMap<>(); + map.put("A", "X"); + map.put("B", "Y"); + map.put("C", "Z"); + FrameworkUploader uploader = new FrameworkUploader(); + String output = uploader.expandEnvironmentVariables(input, map); + Assert.assertEquals("Environment not expanded", "C/X/B,Y,D", output); + + } + + /** + * Test regex pattern matching and environment variable replacement. + */ + @Test + public void testRecursiveEnvironmentReplacement() + throws UploaderException { + String input = "C/$A/B,$B,D"; + Map map = new HashMap<>(); + map.put("A", "X"); + map.put("B", "$C"); + map.put("C", "Y"); + FrameworkUploader uploader = new FrameworkUploader(); + String output = uploader.expandEnvironmentVariables(input, map); + Assert.assertEquals("Environment not expanded", "C/X/B,Y,D", output); + + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml index 274a821e767..a8350cbdfd1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml @@ -326,5 +326,6 @@ hadoop-mapreduce-client-hs hadoop-mapreduce-client-hs-plugins hadoop-mapreduce-client-nativetask + hadoop-mapreduce-client-uploader