MAPREDUCE-6994. Uploader tool for Distributed Cache Deploy code changes (miklos.szegedi@cloudera.com via rkanter)

This commit is contained in:
Robert Kanter 2017-12-01 12:11:43 -08:00
parent 21d3627355
commit 3b78607a02
8 changed files with 881 additions and 0 deletions

View File

@ -32,6 +32,7 @@ function hadoop_usage
hadoop_add_subcommand "pipes" client "run a Pipes job" hadoop_add_subcommand "pipes" client "run a Pipes job"
hadoop_add_subcommand "queue" client "get information regarding JobQueues" hadoop_add_subcommand "queue" client "get information regarding JobQueues"
hadoop_add_subcommand "sampler" client "sampler" hadoop_add_subcommand "sampler" client "sampler"
hadoop_add_subcommand "frameworkuploader" admin "mapreduce framework upload"
hadoop_add_subcommand "version" client "print the version" hadoop_add_subcommand "version" client "print the version"
hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" true hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" true
} }
@ -92,6 +93,9 @@ function mapredcmd_case
sampler) sampler)
HADOOP_CLASSNAME=org.apache.hadoop.mapred.lib.InputSampler HADOOP_CLASSNAME=org.apache.hadoop.mapred.lib.InputSampler
;; ;;
frameworkuploader)
HADOOP_CLASSNAME=org.apache.hadoop.mapred.uploader.FrameworkUploader
;;
version) version)
HADOOP_CLASSNAME=org.apache.hadoop.util.VersionInfo HADOOP_CLASSNAME=org.apache.hadoop.util.VersionInfo
;; ;;

View File

@ -0,0 +1,67 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hadoop-mapreduce-client</artifactId>
<groupId>org.apache.hadoop</groupId>
<version>3.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hadoop-mapreduce-client-uploader</artifactId>
<version>3.1.0-SNAPSHOT</version>
<name>Apache Hadoop MapReduce Uploader</name>
<dependencies>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</dependency>
</dependencies>
<properties>
<!-- Needed for generating FindBugs warnings using parent pom -->
<mr.basedir>${project.parent.basedir}/../</mr.basedir>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.hadoop.mapred.uploader.FrameworkUploader</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.uploader;
/**
* Default white list and black list implementations.
*/
final class DefaultJars {
static final String DEFAULT_EXCLUDED_MR_JARS =
".*hadoop-yarn-server-applicationhistoryservice.*\\.jar," +
".*hadoop-yarn-server-nodemanager.*\\.jar," +
".*hadoop-yarn-server-resourcemanager.*\\.jar," +
".*hadoop-yarn-server-router.*\\.jar," +
".*hadoop-yarn-server-sharedcachemanager.*\\.jar," +
".*hadoop-yarn-server-timeline-pluginstorage.*\\.jar," +
".*hadoop-yarn-server-timelineservice.*\\.jar," +
".*hadoop-yarn-server-timelineservice-hbase.*\\.jar,";
static final String DEFAULT_MR_JARS =
"$HADOOP_HOME/share/hadoop/common/.*\\.jar," +
"$HADOOP_HOME/share/hadoop/common/lib/.*\\.jar," +
"$HADOOP_HOME/share/hadoop/hdfs/.*\\.jar," +
"$HADOOP_HOME/share/hadoop/hdfs/lib/.*\\.jar," +
"$HADOOP_HOME/share/hadoop/mapreduce/.*\\.jar," +
"$HADOOP_HOME/share/hadoop/mapreduce/lib/.*\\.jar," +
"$HADOOP_HOME/share/hadoop/yarn/.*\\.jar," +
"$HADOOP_HOME/share/hadoop/yarn/lib/.*\\.jar,";
private DefaultJars() {}
}

View File

@ -0,0 +1,384 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.uploader;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.GZIPOutputStream;
/**
* Upload a MapReduce framework tarball to HDFS.
* Usage:
* sudo -u mapred mapred frameworkuploader -fs hdfs://`hostname`:8020 -target
* /tmp/upload.tar.gz#mr-framework
*/
public class FrameworkUploader implements Runnable {
private static final Pattern VAR_SUBBER =
Pattern.compile(Shell.getEnvironmentVariableRegex());
private static final Logger LOG =
LoggerFactory.getLogger(FrameworkUploader.class);
@VisibleForTesting
String input = null;
@VisibleForTesting
String whitelist = null;
@VisibleForTesting
String blacklist = null;
@VisibleForTesting
String target = null;
@VisibleForTesting
short replication = 10;
@VisibleForTesting
Set<String> filteredInputFiles = new HashSet<>();
@VisibleForTesting
List<Pattern> whitelistedFiles = new LinkedList<>();
@VisibleForTesting
List<Pattern> blacklistedFiles = new LinkedList<>();
@VisibleForTesting
OutputStream targetStream = null;
private Path targetPath = null;
private String alias = null;
private void printHelp(Options options) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("mapred frameworkuploader", options);
}
public void run() {
try {
collectPackages();
buildPackage();
LOG.info("Uploaded " + target);
System.out.println("Suggested mapreduce.application.framework.path " +
target);
LOG.info(
"Suggested mapreduce.application.classpath $PWD/" + alias + "/*");
System.out.println("Suggested classpath $PWD/" + alias + "/*");
} catch (UploaderException|IOException e) {
LOG.error("Error in execution " + e.getMessage());
e.printStackTrace();
}
}
@VisibleForTesting
void collectPackages() throws UploaderException {
parseLists();
String[] list = StringUtils.split(input, File.pathSeparatorChar);
for (String item : list) {
LOG.info("Original source " + item);
String expanded = expandEnvironmentVariables(item, System.getenv());
LOG.info("Expanded source " + expanded);
if (expanded.endsWith("*")) {
File path = new File(expanded.substring(0, expanded.length() - 1));
if (path.isDirectory()) {
File[] files = path.listFiles();
if (files != null) {
for (File jar : files) {
if (!jar.isDirectory()) {
addJar(jar);
} else {
LOG.info("Ignored " + jar + " because it is a directory");
}
}
} else {
LOG.warn("Could not list directory " + path);
}
} else {
LOG.warn("Ignored " + expanded + ". It is not a directory");
}
} else if (expanded.endsWith(".jar")) {
File jarFile = new File(expanded);
addJar(jarFile);
} else if (!expanded.isEmpty()) {
LOG.warn("Ignored " + expanded + " only jars are supported");
}
}
}
private void beginUpload() throws IOException, UploaderException {
if (targetStream == null) {
validateTargetPath();
int lastIndex = target.indexOf('#');
targetPath =
new Path(
target.substring(
0, lastIndex == -1 ? target.length() : lastIndex));
alias = lastIndex != -1 ?
target.substring(lastIndex + 1) :
targetPath.getName();
LOG.info("Target " + targetPath);
FileSystem fileSystem = targetPath.getFileSystem(new Configuration());
targetStream = fileSystem.create(targetPath, true);
}
}
@VisibleForTesting
void buildPackage() throws IOException, UploaderException {
beginUpload();
LOG.info("Compressing tarball");
try (TarArchiveOutputStream out = new TarArchiveOutputStream(
new GZIPOutputStream(targetStream))) {
for (String fullPath : filteredInputFiles) {
LOG.info("Adding " + fullPath);
File file = new File(fullPath);
try (FileInputStream inputStream = new FileInputStream(file)) {
ArchiveEntry entry = out.createArchiveEntry(file, file.getName());
out.putArchiveEntry(entry);
IOUtils.copyBytes(inputStream, out, 1024 * 1024);
out.closeArchiveEntry();
}
}
} finally {
if (targetStream != null) {
targetStream.close();
}
}
if (targetPath == null) {
return;
}
// Set file attributes
FileSystem fileSystem = targetPath.getFileSystem(new Configuration());
if (fileSystem instanceof DistributedFileSystem) {
LOG.info("Disabling Erasure Coding for path: " + targetPath);
DistributedFileSystem dfs = (DistributedFileSystem) fileSystem;
dfs.setErasureCodingPolicy(targetPath,
SystemErasureCodingPolicies.getReplicationPolicy().getName());
}
if (replication > 0) {
LOG.info("Set replication to " +
replication + " for path: " + targetPath);
fileSystem.setReplication(targetPath, replication);
}
}
private void parseLists() throws UploaderException {
Map<String, String> env = System.getenv();
for(Map.Entry<String, String> item : env.entrySet()) {
LOG.info("Environment " + item.getKey() + " " + item.getValue());
}
String[] whiteListItems = StringUtils.split(whitelist);
for (String pattern : whiteListItems) {
String expandedPattern =
expandEnvironmentVariables(pattern, env);
Pattern compiledPattern =
Pattern.compile("^" + expandedPattern + "$");
LOG.info("Whitelisted " + compiledPattern.toString());
whitelistedFiles.add(compiledPattern);
}
String[] blacklistItems = StringUtils.split(blacklist);
for (String pattern : blacklistItems) {
String expandedPattern =
expandEnvironmentVariables(pattern, env);
Pattern compiledPattern =
Pattern.compile("^" + expandedPattern + "$");
LOG.info("Blacklisted " + compiledPattern.toString());
blacklistedFiles.add(compiledPattern);
}
}
@VisibleForTesting
String expandEnvironmentVariables(String innerInput, Map<String, String> env)
throws UploaderException {
boolean found;
do {
found = false;
Matcher matcher = VAR_SUBBER.matcher(innerInput);
StringBuffer stringBuffer = new StringBuffer();
while (matcher.find()) {
found = true;
String var = matcher.group(1);
// replace $env with the child's env constructed by tt's
String replace = env.get(var);
// the env key is not present anywhere .. simply set it
if (replace == null) {
throw new UploaderException("Environment variable does not exist " +
var);
}
matcher.appendReplacement(
stringBuffer, Matcher.quoteReplacement(replace));
}
matcher.appendTail(stringBuffer);
innerInput = stringBuffer.toString();
} while (found);
return innerInput;
}
private void addJar(File jar) throws UploaderException{
boolean found = false;
if (!jar.getName().endsWith(".jar")) {
LOG.info("Ignored non-jar " + jar.getAbsolutePath());
}
for (Pattern pattern : whitelistedFiles) {
Matcher matcher = pattern.matcher(jar.getAbsolutePath());
if (matcher.matches()) {
LOG.info("Whitelisted " + jar.getAbsolutePath());
found = true;
break;
}
}
boolean excluded = false;
for (Pattern pattern : blacklistedFiles) {
Matcher matcher = pattern.matcher(jar.getAbsolutePath());
if (matcher.matches()) {
LOG.info("Blacklisted " + jar.getAbsolutePath());
excluded = true;
break;
}
}
if (found && !excluded) {
LOG.info("Whitelisted " + jar.getAbsolutePath());
if (!filteredInputFiles.add(jar.getAbsolutePath())) {
throw new UploaderException("Duplicate jar" + jar.getAbsolutePath());
}
}
if (!found) {
LOG.info("Ignored " + jar.getAbsolutePath() + " because it is missing " +
"from the whitelist");
} else if (excluded) {
LOG.info("Ignored " + jar.getAbsolutePath() + " because it is on " +
"the the blacklist");
}
}
private void validateTargetPath() throws UploaderException {
if (!target.startsWith("hdfs:/") &&
!target.startsWith("file:/")) {
throw new UploaderException("Target path is not hdfs or local " + target);
}
}
@VisibleForTesting
boolean parseArguments(String[] args) throws IOException {
Options opts = new Options();
opts.addOption(OptionBuilder.create("h"));
opts.addOption(OptionBuilder.create("help"));
opts.addOption(OptionBuilder
.withDescription("Input class path")
.hasArg().create("input"));
opts.addOption(OptionBuilder
.withDescription(
"Regex specifying the full path of jars to include in the" +
" framework tarball. Default is a hardcoded set of jars" +
" considered necessary to include")
.hasArg().create("whitelist"));
opts.addOption(OptionBuilder
.withDescription(
"Regex specifying the full path of jars to exclude in the" +
" framework tarball. Default is a hardcoded set of jars" +
" considered unnecessary to include")
.hasArg().create("blacklist"));
opts.addOption(OptionBuilder
.withDescription(
"Target file system to upload to." +
" Example: hdfs://foo.com:8020")
.hasArg().create("fs"));
opts.addOption(OptionBuilder
.withDescription(
"Target file to upload to with a reference name." +
" Example: /usr/mr-framework.tar.gz#mr-framework")
.hasArg().create("target"));
opts.addOption(OptionBuilder
.withDescription(
"Desired replication count")
.hasArg().create("replication"));
GenericOptionsParser parser = new GenericOptionsParser(opts, args);
if (parser.getCommandLine().hasOption("help") ||
parser.getCommandLine().hasOption("h")) {
printHelp(opts);
return false;
}
input = parser.getCommandLine().getOptionValue(
"input", System.getProperty("java.class.path"));
whitelist = parser.getCommandLine().getOptionValue(
"whitelist", DefaultJars.DEFAULT_MR_JARS);
blacklist = parser.getCommandLine().getOptionValue(
"blacklist", DefaultJars.DEFAULT_EXCLUDED_MR_JARS);
replication = Short.parseShort(parser.getCommandLine().getOptionValue(
"replication", "10"));
String fs = parser.getCommandLine()
.getOptionValue("fs", null);
if (fs == null) {
LOG.error("Target file system not specified");
printHelp(opts);
return false;
}
String path = parser.getCommandLine().getOptionValue("target",
"mr-framework.tar.gz#mr-framework");
if (path == null) {
LOG.error("Target directory not specified");
printHelp(opts);
return false;
}
StringBuilder absolutePath = new StringBuilder(fs);
absolutePath = absolutePath.append(path.startsWith("/") ? "" : "/");
absolutePath.append(path);
target = absolutePath.toString();
if (parser.getRemainingArgs().length > 0) {
LOG.warn("Unexpected parameters");
printHelp(opts);
return false;
}
return true;
}
/**
* Tool entry point.
* @param args arguments
* @throws IOException thrown on configuration errors
*/
public static void main(String[] args) throws IOException {
FrameworkUploader uploader = new FrameworkUploader();
if(uploader.parseArguments(args)) {
uploader.run();
}
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.uploader;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Framework uploaded exception type.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
class UploaderException extends Exception {
private static final long serialVersionUID = 1L;
UploaderException(String message) {
super(message);
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.mapred.uploader contains classes related to the
* MapReduce framework upload tool.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
package org.apache.hadoop.mapred.uploader;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -0,0 +1,315 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.uploader;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.zip.GZIPInputStream;
/**
* Unit test class for FrameworkUploader.
*/
public class TestFrameworkUploader {
private static String testDir;
@Before
public void setUp() {
String testRootDir =
new File(System.getProperty("test.build.data", "/tmp"))
.getAbsolutePath()
.replace(' ', '+');
Random random = new Random(System.currentTimeMillis());
testDir = testRootDir + File.separatorChar +
Long.toString(random.nextLong());
}
/**
* Test requesting command line help.
* @throws IOException test failure
*/
@Test
public void testHelp() throws IOException {
String[] args = new String[]{"-help"};
FrameworkUploader uploader = new FrameworkUploader();
boolean success = uploader.parseArguments(args);
Assert.assertFalse("Expected to print help", success);
Assert.assertEquals("Expected ignore run", null,
uploader.input);
Assert.assertEquals("Expected ignore run", null,
uploader.whitelist);
Assert.assertEquals("Expected ignore run", null,
uploader.target);
}
/**
* Test invalid argument parsing.
* @throws IOException test failure
*/
@Test
public void testWrongArgument() throws IOException {
String[] args = new String[]{"-unexpected"};
FrameworkUploader uploader = new FrameworkUploader();
boolean success = uploader.parseArguments(args);
Assert.assertFalse("Expected to print help", success);
}
/**
* Test normal argument passing.
* @throws IOException test failure
*/
@Test
public void testArguments() throws IOException {
String[] args =
new String[]{
"-input", "A",
"-whitelist", "B",
"-blacklist", "C",
"-fs", "hdfs://C:8020",
"-target", "D",
"-replication", "100"};
FrameworkUploader uploader = new FrameworkUploader();
boolean success = uploader.parseArguments(args);
Assert.assertTrue("Expected to print help", success);
Assert.assertEquals("Input mismatch", "A",
uploader.input);
Assert.assertEquals("Whitelist mismatch", "B",
uploader.whitelist);
Assert.assertEquals("Blacklist mismatch", "C",
uploader.blacklist);
Assert.assertEquals("Target mismatch", "hdfs://C:8020/D",
uploader.target);
Assert.assertEquals("Replication mismatch", 100,
uploader.replication);
}
/**
* Test whether we can filter a class path properly.
* @throws IOException test failure
*/
@Test
public void testCollectPackages() throws IOException, UploaderException {
File parent = new File(testDir);
try {
parent.deleteOnExit();
Assert.assertTrue("Directory creation failed", parent.mkdirs());
File dirA = new File(parent, "A");
Assert.assertTrue(dirA.mkdirs());
File dirB = new File(parent, "B");
Assert.assertTrue(dirB.mkdirs());
File jarA = new File(dirA, "a.jar");
Assert.assertTrue(jarA.createNewFile());
File jarB = new File(dirA, "b.jar");
Assert.assertTrue(jarB.createNewFile());
File jarC = new File(dirA, "c.jar");
Assert.assertTrue(jarC.createNewFile());
File txtD = new File(dirA, "d.txt");
Assert.assertTrue(txtD.createNewFile());
File jarD = new File(dirB, "d.jar");
Assert.assertTrue(jarD.createNewFile());
File txtE = new File(dirB, "e.txt");
Assert.assertTrue(txtE.createNewFile());
FrameworkUploader uploader = new FrameworkUploader();
uploader.whitelist = ".*a\\.jar,.*b\\.jar,.*d\\.jar";
uploader.blacklist = ".*b\\.jar";
uploader.input = dirA.getAbsolutePath() + File.separatorChar + "*" +
File.pathSeparatorChar +
dirB.getAbsolutePath() + File.separatorChar + "*";
uploader.collectPackages();
Assert.assertEquals("Whitelist count error", 3,
uploader.whitelistedFiles.size());
Assert.assertEquals("Blacklist count error", 1,
uploader.blacklistedFiles.size());
Assert.assertTrue("File not collected",
uploader.filteredInputFiles.contains(jarA.getAbsolutePath()));
Assert.assertFalse("File collected",
uploader.filteredInputFiles.contains(jarB.getAbsolutePath()));
Assert.assertTrue("File not collected",
uploader.filteredInputFiles.contains(jarD.getAbsolutePath()));
Assert.assertEquals("Too many whitelists", 2,
uploader.filteredInputFiles.size());
} finally {
FileUtils.deleteDirectory(parent);
}
}
/**
* Test building a tarball from source jars.
*/
@Test
public void testBuildTarBall() throws IOException, UploaderException {
File parent = new File(testDir);
try {
parent.deleteOnExit();
FrameworkUploader uploader = prepareTree(parent);
File gzipFile = new File("upload.tar.gz");
gzipFile.deleteOnExit();
Assert.assertTrue("Creating output", gzipFile.createNewFile());
uploader.targetStream = new FileOutputStream(gzipFile);
uploader.buildPackage();
TarArchiveInputStream result = null;
try {
result =
new TarArchiveInputStream(
new GZIPInputStream(new FileInputStream(gzipFile)));
Set<String> fileNames = new HashSet<>();
Set<Long> sizes = new HashSet<>();
TarArchiveEntry entry1 = result.getNextTarEntry();
fileNames.add(entry1.getName());
sizes.add(entry1.getSize());
TarArchiveEntry entry2 = result.getNextTarEntry();
fileNames.add(entry2.getName());
sizes.add(entry2.getSize());
Assert.assertTrue(
"File name error", fileNames.contains("a.jar"));
Assert.assertTrue(
"File size error", sizes.contains((long) 13));
Assert.assertTrue(
"File name error", fileNames.contains("b.jar"));
Assert.assertTrue(
"File size error", sizes.contains((long) 14));
} finally {
if (result != null) {
result.close();
}
}
} finally {
FileUtils.deleteDirectory(parent);
}
}
/**
* Test upload to HDFS.
*/
@Test
public void testUpload() throws IOException, UploaderException {
final String fileName = "/upload.tar.gz";
File parent = new File(testDir);
try {
parent.deleteOnExit();
FrameworkUploader uploader = prepareTree(parent);
uploader.target = "file://" + parent.getAbsolutePath() + fileName;
uploader.buildPackage();
try (TarArchiveInputStream archiveInputStream = new TarArchiveInputStream(
new GZIPInputStream(
new FileInputStream(
parent.getAbsolutePath() + fileName)))) {
Set<String> fileNames = new HashSet<>();
Set<Long> sizes = new HashSet<>();
TarArchiveEntry entry1 = archiveInputStream.getNextTarEntry();
fileNames.add(entry1.getName());
sizes.add(entry1.getSize());
TarArchiveEntry entry2 = archiveInputStream.getNextTarEntry();
fileNames.add(entry2.getName());
sizes.add(entry2.getSize());
Assert.assertTrue(
"File name error", fileNames.contains("a.jar"));
Assert.assertTrue(
"File size error", sizes.contains((long) 13));
Assert.assertTrue(
"File name error", fileNames.contains("b.jar"));
Assert.assertTrue(
"File size error", sizes.contains((long) 14));
}
} finally {
FileUtils.deleteDirectory(parent);
}
}
/**
* Prepare a mock directory tree to compress and upload.
*/
private FrameworkUploader prepareTree(File parent)
throws FileNotFoundException {
Assert.assertTrue(parent.mkdirs());
File dirA = new File(parent, "A");
Assert.assertTrue(dirA.mkdirs());
File jarA = new File(parent, "a.jar");
PrintStream printStream = new PrintStream(new FileOutputStream(jarA));
printStream.println("Hello World!");
printStream.close();
File jarB = new File(dirA, "b.jar");
printStream = new PrintStream(new FileOutputStream(jarB));
printStream.println("Hello Galaxy!");
printStream.close();
FrameworkUploader uploader = new FrameworkUploader();
uploader.filteredInputFiles.add(jarA.getAbsolutePath());
uploader.filteredInputFiles.add(jarB.getAbsolutePath());
return uploader;
}
/**
* Test regex pattern matching and environment variable replacement.
*/
@Test
public void testEnvironmentReplacement() throws UploaderException {
String input = "C/$A/B,$B,D";
Map<String, String> map = new HashMap<>();
map.put("A", "X");
map.put("B", "Y");
map.put("C", "Z");
FrameworkUploader uploader = new FrameworkUploader();
String output = uploader.expandEnvironmentVariables(input, map);
Assert.assertEquals("Environment not expanded", "C/X/B,Y,D", output);
}
/**
* Test regex pattern matching and environment variable replacement.
*/
@Test
public void testRecursiveEnvironmentReplacement()
throws UploaderException {
String input = "C/$A/B,$B,D";
Map<String, String> map = new HashMap<>();
map.put("A", "X");
map.put("B", "$C");
map.put("C", "Y");
FrameworkUploader uploader = new FrameworkUploader();
String output = uploader.expandEnvironmentVariables(input, map);
Assert.assertEquals("Environment not expanded", "C/X/B,Y,D", output);
}
}

View File

@ -326,5 +326,6 @@
<module>hadoop-mapreduce-client-hs</module> <module>hadoop-mapreduce-client-hs</module>
<module>hadoop-mapreduce-client-hs-plugins</module> <module>hadoop-mapreduce-client-hs-plugins</module>
<module>hadoop-mapreduce-client-nativetask</module> <module>hadoop-mapreduce-client-nativetask</module>
<module>hadoop-mapreduce-client-uploader</module>
</modules> </modules>
</project> </project>