Revert "YARN-2185. Use pipes when localizing archives. Contributed by Miklos Szegedi"
This reverts commit 1b0f265db1
.
This commit is contained in:
parent
6463e10c72
commit
901d15a30b
|
@ -20,35 +20,27 @@ package org.apache.hadoop.fs;
|
||||||
|
|
||||||
import java.io.BufferedInputStream;
|
import java.io.BufferedInputStream;
|
||||||
import java.io.BufferedOutputStream;
|
import java.io.BufferedOutputStream;
|
||||||
import java.io.BufferedReader;
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.InputStreamReader;
|
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.nio.charset.Charset;
|
|
||||||
import java.nio.file.AccessDeniedException;
|
import java.nio.file.AccessDeniedException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Enumeration;
|
import java.util.Enumeration;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.jar.Attributes;
|
import java.util.jar.Attributes;
|
||||||
import java.util.jar.JarOutputStream;
|
import java.util.jar.JarOutputStream;
|
||||||
import java.util.jar.Manifest;
|
import java.util.jar.Manifest;
|
||||||
import java.util.zip.GZIPInputStream;
|
import java.util.zip.GZIPInputStream;
|
||||||
import java.util.zip.ZipEntry;
|
import java.util.zip.ZipEntry;
|
||||||
import java.util.zip.ZipFile;
|
import java.util.zip.ZipFile;
|
||||||
import java.util.zip.ZipInputStream;
|
|
||||||
|
|
||||||
import org.apache.commons.collections.map.CaseInsensitiveMap;
|
import org.apache.commons.collections.map.CaseInsensitiveMap;
|
||||||
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
||||||
|
@ -82,11 +74,6 @@ public class FileUtil {
|
||||||
* */
|
* */
|
||||||
public static final int SYMLINK_NO_PRIVILEGE = 2;
|
public static final int SYMLINK_NO_PRIVILEGE = 2;
|
||||||
|
|
||||||
/**
|
|
||||||
* Buffer size for copy the content of compressed file to new file.
|
|
||||||
*/
|
|
||||||
private static final int BUFFER_SIZE = 8_192;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* convert an array of FileStatus to an array of Path
|
* convert an array of FileStatus to an array of Path
|
||||||
*
|
*
|
||||||
|
@ -538,22 +525,6 @@ public class FileUtil {
|
||||||
return makeShellPath(file, false);
|
return makeShellPath(file, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert a os-native filename to a path that works for the shell
|
|
||||||
* and avoids script injection attacks.
|
|
||||||
* @param file The filename to convert
|
|
||||||
* @return The unix pathname
|
|
||||||
* @throws IOException on windows, there can be problems with the subprocess
|
|
||||||
*/
|
|
||||||
public static String makeSecureShellPath(File file) throws IOException {
|
|
||||||
if (Shell.WINDOWS) {
|
|
||||||
// Currently it is never called, but it might be helpful in the future.
|
|
||||||
throw new UnsupportedOperationException("Not implemented for Windows");
|
|
||||||
} else {
|
|
||||||
return makeShellPath(file, false).replace("'", "'\\''");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert a os-native filename to a path that works for the shell.
|
* Convert a os-native filename to a path that works for the shell.
|
||||||
* @param file The filename to convert
|
* @param file The filename to convert
|
||||||
|
@ -605,48 +576,11 @@ public class FileUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given a stream input it will unzip the it in the unzip directory.
|
* Given a File input it will unzip the file in a the unzip directory
|
||||||
* passed as the second parameter
|
|
||||||
* @param inputStream The zip file as input
|
|
||||||
* @param toDir The unzip directory where to unzip the zip file.
|
|
||||||
* @throws IOException an exception occurred
|
|
||||||
*/
|
|
||||||
public static void unZip(InputStream inputStream, File toDir)
|
|
||||||
throws IOException {
|
|
||||||
try (ZipInputStream zip = new ZipInputStream(inputStream)) {
|
|
||||||
int numOfFailedLastModifiedSet = 0;
|
|
||||||
for(ZipEntry entry = zip.getNextEntry();
|
|
||||||
entry != null;
|
|
||||||
entry = zip.getNextEntry()) {
|
|
||||||
if (!entry.isDirectory()) {
|
|
||||||
File file = new File(toDir, entry.getName());
|
|
||||||
File parent = file.getParentFile();
|
|
||||||
if (!parent.mkdirs() &&
|
|
||||||
!parent.isDirectory()) {
|
|
||||||
throw new IOException("Mkdirs failed to create " +
|
|
||||||
parent.getAbsolutePath());
|
|
||||||
}
|
|
||||||
try (OutputStream out = new FileOutputStream(file)) {
|
|
||||||
IOUtils.copyBytes(zip, out, BUFFER_SIZE);
|
|
||||||
}
|
|
||||||
if (!file.setLastModified(entry.getTime())) {
|
|
||||||
numOfFailedLastModifiedSet++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (numOfFailedLastModifiedSet > 0) {
|
|
||||||
LOG.warn("Could not set last modfied time for {} file(s)",
|
|
||||||
numOfFailedLastModifiedSet);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Given a File input it will unzip it in the unzip directory.
|
|
||||||
* passed as the second parameter
|
* passed as the second parameter
|
||||||
* @param inFile The zip file as input
|
* @param inFile The zip file as input
|
||||||
* @param unzipDir The unzip directory where to unzip the zip file.
|
* @param unzipDir The unzip directory where to unzip the zip file.
|
||||||
* @throws IOException An I/O exception has occurred
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static void unZip(File inFile, File unzipDir) throws IOException {
|
public static void unZip(File inFile, File unzipDir) throws IOException {
|
||||||
Enumeration<? extends ZipEntry> entries;
|
Enumeration<? extends ZipEntry> entries;
|
||||||
|
@ -686,138 +620,6 @@ public class FileUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Run a command and send the contents of an input stream to it.
|
|
||||||
* @param inputStream Input stream to forward to the shell command
|
|
||||||
* @param command shell command to run
|
|
||||||
* @throws IOException read or write failed
|
|
||||||
* @throws InterruptedException command interrupted
|
|
||||||
* @throws ExecutionException task submit failed
|
|
||||||
*/
|
|
||||||
private static void runCommandOnStream(
|
|
||||||
InputStream inputStream, String command)
|
|
||||||
throws IOException, InterruptedException, ExecutionException {
|
|
||||||
ExecutorService executor = null;
|
|
||||||
ProcessBuilder builder = new ProcessBuilder();
|
|
||||||
builder.command(
|
|
||||||
Shell.WINDOWS ? "cmd" : "bash",
|
|
||||||
Shell.WINDOWS ? "/c" : "-c",
|
|
||||||
command);
|
|
||||||
Process process = builder.start();
|
|
||||||
int exitCode;
|
|
||||||
try {
|
|
||||||
// Consume stdout and stderr, to avoid blocking the command
|
|
||||||
executor = Executors.newFixedThreadPool(2);
|
|
||||||
Future output = executor.submit(() -> {
|
|
||||||
try {
|
|
||||||
// Read until the output stream receives an EOF and closed.
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
// Log directly to avoid out of memory errors
|
|
||||||
try (BufferedReader reader =
|
|
||||||
new BufferedReader(
|
|
||||||
new InputStreamReader(process.getInputStream(),
|
|
||||||
Charset.forName("UTF-8")))) {
|
|
||||||
String line;
|
|
||||||
while((line = reader.readLine()) != null) {
|
|
||||||
LOG.debug(line);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
org.apache.commons.io.IOUtils.copy(
|
|
||||||
process.getInputStream(),
|
|
||||||
new IOUtils.NullOutputStream());
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.debug(e.getMessage());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
Future error = executor.submit(() -> {
|
|
||||||
try {
|
|
||||||
// Read until the error stream receives an EOF and closed.
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
// Log directly to avoid out of memory errors
|
|
||||||
try (BufferedReader reader =
|
|
||||||
new BufferedReader(
|
|
||||||
new InputStreamReader(process.getErrorStream(),
|
|
||||||
Charset.forName("UTF-8")))) {
|
|
||||||
String line;
|
|
||||||
while((line = reader.readLine()) != null) {
|
|
||||||
LOG.debug(line);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
org.apache.commons.io.IOUtils.copy(
|
|
||||||
process.getErrorStream(),
|
|
||||||
new IOUtils.NullOutputStream());
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.debug(e.getMessage());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Pass the input stream to the command to process
|
|
||||||
try {
|
|
||||||
org.apache.commons.io.IOUtils.copy(
|
|
||||||
inputStream, process.getOutputStream());
|
|
||||||
} finally {
|
|
||||||
process.getOutputStream().close();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for both stdout and stderr futures to finish
|
|
||||||
error.get();
|
|
||||||
output.get();
|
|
||||||
} finally {
|
|
||||||
// Clean up the threads
|
|
||||||
if (executor != null) {
|
|
||||||
executor.shutdown();
|
|
||||||
}
|
|
||||||
// Wait to avoid leaking the child process
|
|
||||||
exitCode = process.waitFor();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (exitCode != 0) {
|
|
||||||
throw new IOException(
|
|
||||||
String.format(
|
|
||||||
"Error executing command. %s " +
|
|
||||||
"Process exited with exit code %d.",
|
|
||||||
command, exitCode));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Given a Tar File as input it will untar the file in a the untar directory
|
|
||||||
* passed as the second parameter
|
|
||||||
*
|
|
||||||
* This utility will untar ".tar" files and ".tar.gz","tgz" files.
|
|
||||||
*
|
|
||||||
* @param inputStream The tar file as input.
|
|
||||||
* @param untarDir The untar directory where to untar the tar file.
|
|
||||||
* @param gzipped The input stream is gzipped
|
|
||||||
* TODO Use magic number and PusbackInputStream to identify
|
|
||||||
* @throws IOException an exception occurred
|
|
||||||
* @throws InterruptedException command interrupted
|
|
||||||
* @throws ExecutionException task submit failed
|
|
||||||
*/
|
|
||||||
public static void unTar(InputStream inputStream, File untarDir,
|
|
||||||
boolean gzipped)
|
|
||||||
throws IOException, InterruptedException, ExecutionException {
|
|
||||||
if (!untarDir.mkdirs()) {
|
|
||||||
if (!untarDir.isDirectory()) {
|
|
||||||
throw new IOException("Mkdirs failed to create " + untarDir);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if(Shell.WINDOWS) {
|
|
||||||
// Tar is not native to Windows. Use simple Java based implementation for
|
|
||||||
// tests and simple tar archives
|
|
||||||
unTarUsingJava(inputStream, untarDir, gzipped);
|
|
||||||
} else {
|
|
||||||
// spawn tar utility to untar archive for full fledged unix behavior such
|
|
||||||
// as resolving symlinks in tar archives
|
|
||||||
unTarUsingTar(inputStream, untarDir, gzipped);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given a Tar File as input it will untar the file in a the untar directory
|
* Given a Tar File as input it will untar the file in a the untar directory
|
||||||
* passed as the second parameter
|
* passed as the second parameter
|
||||||
|
@ -848,41 +650,23 @@ public class FileUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void unTarUsingTar(InputStream inputStream, File untarDir,
|
|
||||||
boolean gzipped)
|
|
||||||
throws IOException, InterruptedException, ExecutionException {
|
|
||||||
StringBuilder untarCommand = new StringBuilder();
|
|
||||||
if (gzipped) {
|
|
||||||
untarCommand.append("gzip -dc | (");
|
|
||||||
}
|
|
||||||
untarCommand.append("cd '");
|
|
||||||
untarCommand.append(FileUtil.makeSecureShellPath(untarDir));
|
|
||||||
untarCommand.append("' && ");
|
|
||||||
untarCommand.append("tar -x ");
|
|
||||||
|
|
||||||
if (gzipped) {
|
|
||||||
untarCommand.append(")");
|
|
||||||
}
|
|
||||||
runCommandOnStream(inputStream, untarCommand.toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void unTarUsingTar(File inFile, File untarDir,
|
private static void unTarUsingTar(File inFile, File untarDir,
|
||||||
boolean gzipped) throws IOException {
|
boolean gzipped) throws IOException {
|
||||||
StringBuffer untarCommand = new StringBuffer();
|
StringBuffer untarCommand = new StringBuffer();
|
||||||
if (gzipped) {
|
if (gzipped) {
|
||||||
untarCommand.append(" gzip -dc '");
|
untarCommand.append(" gzip -dc '");
|
||||||
untarCommand.append(FileUtil.makeSecureShellPath(inFile));
|
untarCommand.append(FileUtil.makeShellPath(inFile));
|
||||||
untarCommand.append("' | (");
|
untarCommand.append("' | (");
|
||||||
}
|
}
|
||||||
untarCommand.append("cd '");
|
untarCommand.append("cd '");
|
||||||
untarCommand.append(FileUtil.makeSecureShellPath(untarDir));
|
untarCommand.append(FileUtil.makeShellPath(untarDir));
|
||||||
untarCommand.append("' && ");
|
untarCommand.append("' ; ");
|
||||||
untarCommand.append("tar -xf ");
|
untarCommand.append("tar -xf ");
|
||||||
|
|
||||||
if (gzipped) {
|
if (gzipped) {
|
||||||
untarCommand.append(" -)");
|
untarCommand.append(" -)");
|
||||||
} else {
|
} else {
|
||||||
untarCommand.append(FileUtil.makeSecureShellPath(inFile));
|
untarCommand.append(FileUtil.makeShellPath(inFile));
|
||||||
}
|
}
|
||||||
String[] shellCmd = { "bash", "-c", untarCommand.toString() };
|
String[] shellCmd = { "bash", "-c", untarCommand.toString() };
|
||||||
ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
|
ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
|
||||||
|
@ -917,29 +701,6 @@ public class FileUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void unTarUsingJava(InputStream inputStream, File untarDir,
|
|
||||||
boolean gzipped) throws IOException {
|
|
||||||
TarArchiveInputStream tis = null;
|
|
||||||
try {
|
|
||||||
if (gzipped) {
|
|
||||||
inputStream = new BufferedInputStream(new GZIPInputStream(
|
|
||||||
inputStream));
|
|
||||||
} else {
|
|
||||||
inputStream =
|
|
||||||
new BufferedInputStream(inputStream);
|
|
||||||
}
|
|
||||||
|
|
||||||
tis = new TarArchiveInputStream(inputStream);
|
|
||||||
|
|
||||||
for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null;) {
|
|
||||||
unpackEntries(tis, entry, untarDir);
|
|
||||||
entry = tis.getNextTarEntry();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
IOUtils.cleanupWithLogger(LOG, tis, inputStream);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void unpackEntries(TarArchiveInputStream tis,
|
private static void unpackEntries(TarArchiveInputStream tis,
|
||||||
TarArchiveEntry entry, File outputDir) throws IOException {
|
TarArchiveEntry entry, File outputDir) throws IOException {
|
||||||
if (entry.isDirectory()) {
|
if (entry.isDirectory()) {
|
||||||
|
|
|
@ -34,11 +34,9 @@ import java.util.Enumeration;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.jar.JarEntry;
|
import java.util.jar.JarEntry;
|
||||||
import java.util.jar.JarFile;
|
import java.util.jar.JarFile;
|
||||||
import java.util.jar.JarInputStream;
|
|
||||||
import java.util.jar.Manifest;
|
import java.util.jar.Manifest;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.commons.io.input.TeeInputStream;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
@ -96,69 +94,6 @@ public class RunJar {
|
||||||
unJar(jarFile, toDir, MATCH_ANY);
|
unJar(jarFile, toDir, MATCH_ANY);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Unpack matching files from a jar. Entries inside the jar that do
|
|
||||||
* not match the given pattern will be skipped.
|
|
||||||
*
|
|
||||||
* @param inputStream the jar stream to unpack
|
|
||||||
* @param toDir the destination directory into which to unpack the jar
|
|
||||||
* @param unpackRegex the pattern to match jar entries against
|
|
||||||
*
|
|
||||||
* @throws IOException if an I/O error has occurred or toDir
|
|
||||||
* cannot be created and does not already exist
|
|
||||||
*/
|
|
||||||
public static void unJar(InputStream inputStream, File toDir,
|
|
||||||
Pattern unpackRegex)
|
|
||||||
throws IOException {
|
|
||||||
try (JarInputStream jar = new JarInputStream(inputStream)) {
|
|
||||||
int numOfFailedLastModifiedSet = 0;
|
|
||||||
for (JarEntry entry = jar.getNextJarEntry();
|
|
||||||
entry != null;
|
|
||||||
entry = jar.getNextJarEntry()) {
|
|
||||||
if (!entry.isDirectory() &&
|
|
||||||
unpackRegex.matcher(entry.getName()).matches()) {
|
|
||||||
File file = new File(toDir, entry.getName());
|
|
||||||
ensureDirectory(file.getParentFile());
|
|
||||||
try (OutputStream out = new FileOutputStream(file)) {
|
|
||||||
IOUtils.copyBytes(jar, out, BUFFER_SIZE);
|
|
||||||
}
|
|
||||||
if (!file.setLastModified(entry.getTime())) {
|
|
||||||
numOfFailedLastModifiedSet++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (numOfFailedLastModifiedSet > 0) {
|
|
||||||
LOG.warn("Could not set last modfied time for {} file(s)",
|
|
||||||
numOfFailedLastModifiedSet);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Unpack matching files from a jar. Entries inside the jar that do
|
|
||||||
* not match the given pattern will be skipped. Keep also a copy
|
|
||||||
* of the entire jar in the same directory for backward compatibility.
|
|
||||||
* TODO remove this feature in a new release and do only unJar
|
|
||||||
*
|
|
||||||
* @param inputStream the jar stream to unpack
|
|
||||||
* @param toDir the destination directory into which to unpack the jar
|
|
||||||
* @param unpackRegex the pattern to match jar entries against
|
|
||||||
*
|
|
||||||
* @throws IOException if an I/O error has occurred or toDir
|
|
||||||
* cannot be created and does not already exist
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public static void unJarAndSave(InputStream inputStream, File toDir,
|
|
||||||
String name, Pattern unpackRegex)
|
|
||||||
throws IOException{
|
|
||||||
File file = new File(toDir, name);
|
|
||||||
ensureDirectory(toDir);
|
|
||||||
try (OutputStream jar = new FileOutputStream(file);
|
|
||||||
TeeInputStream teeInputStream = new TeeInputStream(inputStream, jar)) {
|
|
||||||
unJar(teeInputStream, toDir, unpackRegex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unpack matching files from a jar. Entries inside the jar that do
|
* Unpack matching files from a jar. Entries inside the jar that do
|
||||||
* not match the given pattern will be skipped.
|
* not match the given pattern will be skipped.
|
||||||
|
|
|
@ -21,8 +21,6 @@ package org.apache.hadoop.yarn.util;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
@ -31,7 +29,6 @@ import java.util.concurrent.Future;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||||
|
@ -57,7 +54,6 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.cache.CacheLoader;
|
import com.google.common.cache.CacheLoader;
|
||||||
import com.google.common.cache.LoadingCache;
|
import com.google.common.cache.LoadingCache;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Download a single URL to the local disk.
|
* Download a single URL to the local disk.
|
||||||
|
@ -251,21 +247,9 @@ public class FSDownload implements Callable<Path> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private Path copy(Path sCopy, Path dstdir) throws IOException {
|
||||||
* Localize files.
|
|
||||||
* @param destination destination directory
|
|
||||||
* @throws IOException cannot read or write file
|
|
||||||
* @throws YarnException subcommand returned an error
|
|
||||||
*/
|
|
||||||
private void verifyAndCopy(Path destination)
|
|
||||||
throws IOException, YarnException {
|
|
||||||
final Path sCopy;
|
|
||||||
try {
|
|
||||||
sCopy = resource.getResource().toPath();
|
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
throw new IOException("Invalid resource", e);
|
|
||||||
}
|
|
||||||
FileSystem sourceFs = sCopy.getFileSystem(conf);
|
FileSystem sourceFs = sCopy.getFileSystem(conf);
|
||||||
|
Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName());
|
||||||
FileStatus sStat = sourceFs.getFileStatus(sCopy);
|
FileStatus sStat = sourceFs.getFileStatus(sCopy);
|
||||||
if (sStat.getModificationTime() != resource.getTimestamp()) {
|
if (sStat.getModificationTime() != resource.getTimestamp()) {
|
||||||
throw new IOException("Resource " + sCopy +
|
throw new IOException("Resource " + sCopy +
|
||||||
|
@ -280,109 +264,83 @@ public class FSDownload implements Callable<Path> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
downloadAndUnpack(sCopy, destination);
|
FileUtil.copy(sourceFs, sStat, FileSystem.getLocal(conf), dCopy, false,
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Copy source path to destination with localization rules.
|
|
||||||
* @param source source path to copy. Typically HDFS
|
|
||||||
* @param destination destination path. Typically local filesystem
|
|
||||||
* @exception YarnException Any error has occurred
|
|
||||||
*/
|
|
||||||
private void downloadAndUnpack(Path source, Path destination)
|
|
||||||
throws YarnException {
|
|
||||||
try {
|
|
||||||
FileSystem sourceFileSystem = source.getFileSystem(conf);
|
|
||||||
FileSystem destinationFileSystem = destination.getFileSystem(conf);
|
|
||||||
if (sourceFileSystem.getFileStatus(source).isDirectory()) {
|
|
||||||
FileUtil.copy(
|
|
||||||
sourceFileSystem, source,
|
|
||||||
destinationFileSystem, destination, false,
|
|
||||||
true, conf);
|
true, conf);
|
||||||
} else {
|
return dCopy;
|
||||||
unpack(source, destination, sourceFileSystem, destinationFileSystem);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new YarnException("Download and unpack failed", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private long unpack(File localrsrc, File dst) throws IOException {
|
||||||
* Do the localization action on the input stream.
|
|
||||||
* We use the deprecated method RunJar.unJarAndSave for compatibility reasons.
|
|
||||||
* We should use the more efficient RunJar.unJar in the future.
|
|
||||||
* @param source Source path
|
|
||||||
* @param destination Destination pth
|
|
||||||
* @param sourceFileSystem Source filesystem
|
|
||||||
* @param destinationFileSystem Destination filesystem
|
|
||||||
* @throws IOException Could not read or write stream
|
|
||||||
* @throws InterruptedException Operation interrupted by caller
|
|
||||||
* @throws ExecutionException Could not create thread pool execution
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
private void unpack(Path source, Path destination,
|
|
||||||
FileSystem sourceFileSystem,
|
|
||||||
FileSystem destinationFileSystem)
|
|
||||||
throws IOException, InterruptedException, ExecutionException {
|
|
||||||
try (InputStream inputStream = sourceFileSystem.open(source)) {
|
|
||||||
File dst = new File(destination.toUri());
|
|
||||||
String lowerDst = StringUtils.toLowerCase(dst.getName());
|
|
||||||
switch (resource.getType()) {
|
switch (resource.getType()) {
|
||||||
case ARCHIVE:
|
case ARCHIVE: {
|
||||||
|
String lowerDst = StringUtils.toLowerCase(dst.getName());
|
||||||
if (lowerDst.endsWith(".jar")) {
|
if (lowerDst.endsWith(".jar")) {
|
||||||
RunJar.unJar(inputStream, dst, RunJar.MATCH_ANY);
|
RunJar.unJar(localrsrc, dst);
|
||||||
} else if (lowerDst.endsWith(".zip")) {
|
} else if (lowerDst.endsWith(".zip")) {
|
||||||
FileUtil.unZip(inputStream, dst);
|
FileUtil.unZip(localrsrc, dst);
|
||||||
} else if (lowerDst.endsWith(".tar.gz") ||
|
} else if (lowerDst.endsWith(".tar.gz") ||
|
||||||
lowerDst.endsWith(".tgz") ||
|
lowerDst.endsWith(".tgz") ||
|
||||||
lowerDst.endsWith(".tar")) {
|
lowerDst.endsWith(".tar")) {
|
||||||
FileUtil.unTar(inputStream, dst, lowerDst.endsWith("gz"));
|
FileUtil.unTar(localrsrc, dst);
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Cannot unpack " + source);
|
LOG.warn("Cannot unpack " + localrsrc);
|
||||||
try (OutputStream outputStream =
|
if (!localrsrc.renameTo(dst)) {
|
||||||
destinationFileSystem.create(destination, true)) {
|
throw new IOException("Unable to rename file: [" + localrsrc
|
||||||
IOUtils.copy(inputStream, outputStream);
|
+ "] to [" + dst + "]");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case PATTERN:
|
case PATTERN: {
|
||||||
|
String lowerDst = StringUtils.toLowerCase(dst.getName());
|
||||||
if (lowerDst.endsWith(".jar")) {
|
if (lowerDst.endsWith(".jar")) {
|
||||||
String p = resource.getPattern();
|
String p = resource.getPattern();
|
||||||
|
RunJar.unJar(localrsrc, dst,
|
||||||
|
p == null ? RunJar.MATCH_ANY : Pattern.compile(p));
|
||||||
|
File newDst = new File(dst, dst.getName());
|
||||||
if (!dst.exists() && !dst.mkdir()) {
|
if (!dst.exists() && !dst.mkdir()) {
|
||||||
throw new IOException("Unable to create directory: [" + dst + "]");
|
throw new IOException("Unable to create directory: [" + dst + "]");
|
||||||
}
|
}
|
||||||
RunJar.unJarAndSave(inputStream, dst, source.getName(),
|
if (!localrsrc.renameTo(newDst)) {
|
||||||
p == null ? RunJar.MATCH_ANY : Pattern.compile(p));
|
throw new IOException("Unable to rename file: [" + localrsrc
|
||||||
|
+ "] to [" + newDst + "]");
|
||||||
|
}
|
||||||
} else if (lowerDst.endsWith(".zip")) {
|
} else if (lowerDst.endsWith(".zip")) {
|
||||||
LOG.warn("Treating [" + source + "] as an archive even though it " +
|
LOG.warn("Treating [" + localrsrc + "] as an archive even though it " +
|
||||||
"was specified as PATTERN");
|
"was specified as PATTERN");
|
||||||
FileUtil.unZip(inputStream, dst);
|
FileUtil.unZip(localrsrc, dst);
|
||||||
} else if (lowerDst.endsWith(".tar.gz") ||
|
} else if (lowerDst.endsWith(".tar.gz") ||
|
||||||
lowerDst.endsWith(".tgz") ||
|
lowerDst.endsWith(".tgz") ||
|
||||||
lowerDst.endsWith(".tar")) {
|
lowerDst.endsWith(".tar")) {
|
||||||
LOG.warn("Treating [" + source + "] as an archive even though it " +
|
LOG.warn("Treating [" + localrsrc + "] as an archive even though it " +
|
||||||
"was specified as PATTERN");
|
"was specified as PATTERN");
|
||||||
FileUtil.unTar(inputStream, dst, lowerDst.endsWith("gz"));
|
FileUtil.unTar(localrsrc, dst);
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Cannot unpack " + source);
|
LOG.warn("Cannot unpack " + localrsrc);
|
||||||
try (OutputStream outputStream =
|
if (!localrsrc.renameTo(dst)) {
|
||||||
destinationFileSystem.create(destination, true)) {
|
throw new IOException("Unable to rename file: [" + localrsrc
|
||||||
IOUtils.copy(inputStream, outputStream);
|
+ "] to [" + dst + "]");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case FILE:
|
case FILE:
|
||||||
default:
|
default:
|
||||||
try (OutputStream outputStream =
|
if (!localrsrc.renameTo(dst)) {
|
||||||
destinationFileSystem.create(destination, true)) {
|
throw new IOException("Unable to rename file: [" + localrsrc
|
||||||
IOUtils.copy(inputStream, outputStream);
|
+ "] to [" + dst + "]");
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if(localrsrc.isFile()){
|
||||||
|
try {
|
||||||
|
files.delete(new Path(localrsrc.toString()), false);
|
||||||
|
} catch (IOException ignore) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
// TODO Should calculate here before returning
|
// TODO Should calculate here before returning
|
||||||
//return FileUtil.getDU(destDir);
|
//return FileUtil.getDU(destDir);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Path call() throws Exception {
|
public Path call() throws Exception {
|
||||||
|
@ -394,34 +352,27 @@ public class FSDownload implements Callable<Path> {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(String.format("Starting to download %s %s %s",
|
LOG.debug("Starting to download " + sCopy);
|
||||||
sCopy,
|
|
||||||
resource.getType(),
|
|
||||||
resource.getPattern()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final Path destinationTmp = new Path(destDirPath + "_tmp");
|
createDir(destDirPath, cachePerms);
|
||||||
createDir(destinationTmp, PRIVATE_DIR_PERMS);
|
final Path dst_work = new Path(destDirPath + "_tmp");
|
||||||
Path dFinal =
|
createDir(dst_work, cachePerms);
|
||||||
files.makeQualified(new Path(destinationTmp, sCopy.getName()));
|
Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName()));
|
||||||
try {
|
try {
|
||||||
if (userUgi == null) {
|
Path dTmp = null == userUgi ? files.makeQualified(copy(sCopy, dst_work))
|
||||||
verifyAndCopy(dFinal);
|
: userUgi.doAs(new PrivilegedExceptionAction<Path>() {
|
||||||
} else {
|
public Path run() throws Exception {
|
||||||
userUgi.doAs(new PrivilegedExceptionAction<Void>() {
|
return files.makeQualified(copy(sCopy, dst_work));
|
||||||
@Override
|
};
|
||||||
public Void run() throws Exception {
|
|
||||||
verifyAndCopy(dFinal);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
unpack(new File(dTmp.toUri()), new File(dFinal.toUri()));
|
||||||
changePermissions(dFinal.getFileSystem(conf), dFinal);
|
changePermissions(dFinal.getFileSystem(conf), dFinal);
|
||||||
files.rename(destinationTmp, destDirPath, Rename.OVERWRITE);
|
files.rename(dst_work, destDirPath, Rename.OVERWRITE);
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(String.format("File has been downloaded to %s from %s",
|
LOG.debug("File has been downloaded to " +
|
||||||
new Path(destDirPath, sCopy.getName()), sCopy));
|
new Path(destDirPath, sCopy.getName()));
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
try {
|
try {
|
||||||
|
@ -431,7 +382,7 @@ public class FSDownload implements Callable<Path> {
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
try {
|
||||||
files.delete(destinationTmp, true);
|
files.delete(dst_work, true);
|
||||||
} catch (FileNotFoundException ignore) {
|
} catch (FileNotFoundException ignore) {
|
||||||
}
|
}
|
||||||
conf = null;
|
conf = null;
|
||||||
|
|
|
@ -82,9 +82,6 @@ import com.google.common.cache.CacheBuilder;
|
||||||
import com.google.common.cache.CacheLoader;
|
import com.google.common.cache.CacheLoader;
|
||||||
import com.google.common.cache.LoadingCache;
|
import com.google.common.cache.LoadingCache;
|
||||||
|
|
||||||
/**
|
|
||||||
* Unit test for the FSDownload class.
|
|
||||||
*/
|
|
||||||
public class TestFSDownload {
|
public class TestFSDownload {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestFSDownload.class);
|
private static final Log LOG = LogFactory.getLog(TestFSDownload.class);
|
||||||
|
@ -93,7 +90,6 @@ public class TestFSDownload {
|
||||||
private enum TEST_FILE_TYPE {
|
private enum TEST_FILE_TYPE {
|
||||||
TAR, JAR, ZIP, TGZ
|
TAR, JAR, ZIP, TGZ
|
||||||
};
|
};
|
||||||
private Configuration conf = new Configuration();
|
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void deleteTestDir() throws IOException {
|
public static void deleteTestDir() throws IOException {
|
||||||
|
@ -136,18 +132,6 @@ public class TestFSDownload {
|
||||||
FileOutputStream stream = new FileOutputStream(jarFile);
|
FileOutputStream stream = new FileOutputStream(jarFile);
|
||||||
LOG.info("Create jar out stream ");
|
LOG.info("Create jar out stream ");
|
||||||
JarOutputStream out = new JarOutputStream(stream, new Manifest());
|
JarOutputStream out = new JarOutputStream(stream, new Manifest());
|
||||||
ZipEntry entry = new ZipEntry("classes/1.class");
|
|
||||||
out.putNextEntry(entry);
|
|
||||||
out.write(1);
|
|
||||||
out.write(2);
|
|
||||||
out.write(3);
|
|
||||||
out.closeEntry();
|
|
||||||
ZipEntry entry2 = new ZipEntry("classes/2.class");
|
|
||||||
out.putNextEntry(entry2);
|
|
||||||
out.write(1);
|
|
||||||
out.write(2);
|
|
||||||
out.write(3);
|
|
||||||
out.closeEntry();
|
|
||||||
LOG.info("Done writing jar stream ");
|
LOG.info("Done writing jar stream ");
|
||||||
out.close();
|
out.close();
|
||||||
LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
|
LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
|
||||||
|
@ -272,6 +256,7 @@ public class TestFSDownload {
|
||||||
@Test (timeout=10000)
|
@Test (timeout=10000)
|
||||||
public void testDownloadBadPublic() throws IOException, URISyntaxException,
|
public void testDownloadBadPublic() throws IOException, URISyntaxException,
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
|
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
|
||||||
FileContext files = FileContext.getLocalFSFileContext(conf);
|
FileContext files = FileContext.getLocalFSFileContext(conf);
|
||||||
final Path basedir = files.makeQualified(new Path("target",
|
final Path basedir = files.makeQualified(new Path("target",
|
||||||
|
@ -322,6 +307,7 @@ public class TestFSDownload {
|
||||||
@Test (timeout=60000)
|
@Test (timeout=60000)
|
||||||
public void testDownloadPublicWithStatCache() throws IOException,
|
public void testDownloadPublicWithStatCache() throws IOException,
|
||||||
URISyntaxException, InterruptedException, ExecutionException {
|
URISyntaxException, InterruptedException, ExecutionException {
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
FileContext files = FileContext.getLocalFSFileContext(conf);
|
FileContext files = FileContext.getLocalFSFileContext(conf);
|
||||||
Path basedir = files.makeQualified(new Path("target",
|
Path basedir = files.makeQualified(new Path("target",
|
||||||
TestFSDownload.class.getSimpleName()));
|
TestFSDownload.class.getSimpleName()));
|
||||||
|
@ -396,6 +382,7 @@ public class TestFSDownload {
|
||||||
@Test (timeout=10000)
|
@Test (timeout=10000)
|
||||||
public void testDownload() throws IOException, URISyntaxException,
|
public void testDownload() throws IOException, URISyntaxException,
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
|
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
|
||||||
FileContext files = FileContext.getLocalFSFileContext(conf);
|
FileContext files = FileContext.getLocalFSFileContext(conf);
|
||||||
final Path basedir = files.makeQualified(new Path("target",
|
final Path basedir = files.makeQualified(new Path("target",
|
||||||
|
@ -451,7 +438,7 @@ public class TestFSDownload {
|
||||||
FileStatus status = files.getFileStatus(localized.getParent());
|
FileStatus status = files.getFileStatus(localized.getParent());
|
||||||
FsPermission perm = status.getPermission();
|
FsPermission perm = status.getPermission();
|
||||||
assertEquals("Cache directory permissions are incorrect",
|
assertEquals("Cache directory permissions are incorrect",
|
||||||
new FsPermission((short)0700), perm);
|
new FsPermission((short)0755), perm);
|
||||||
|
|
||||||
status = files.getFileStatus(localized);
|
status = files.getFileStatus(localized);
|
||||||
perm = status.getPermission();
|
perm = status.getPermission();
|
||||||
|
@ -468,6 +455,7 @@ public class TestFSDownload {
|
||||||
|
|
||||||
private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException,
|
private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException,
|
||||||
URISyntaxException, InterruptedException{
|
URISyntaxException, InterruptedException{
|
||||||
|
Configuration conf = new Configuration();
|
||||||
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
|
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
|
||||||
FileContext files = FileContext.getLocalFSFileContext(conf);
|
FileContext files = FileContext.getLocalFSFileContext(conf);
|
||||||
final Path basedir = files.makeQualified(new Path("target",
|
final Path basedir = files.makeQualified(new Path("target",
|
||||||
|
@ -615,6 +603,7 @@ public class TestFSDownload {
|
||||||
|
|
||||||
@Test (timeout=10000)
|
@Test (timeout=10000)
|
||||||
public void testDirDownload() throws IOException, InterruptedException {
|
public void testDirDownload() throws IOException, InterruptedException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
FileContext files = FileContext.getLocalFSFileContext(conf);
|
FileContext files = FileContext.getLocalFSFileContext(conf);
|
||||||
final Path basedir = files.makeQualified(new Path("target",
|
final Path basedir = files.makeQualified(new Path("target",
|
||||||
TestFSDownload.class.getSimpleName()));
|
TestFSDownload.class.getSimpleName()));
|
||||||
|
@ -679,6 +668,7 @@ public class TestFSDownload {
|
||||||
|
|
||||||
@Test (timeout=10000)
|
@Test (timeout=10000)
|
||||||
public void testUniqueDestinationPath() throws Exception {
|
public void testUniqueDestinationPath() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
FileContext files = FileContext.getLocalFSFileContext(conf);
|
FileContext files = FileContext.getLocalFSFileContext(conf);
|
||||||
final Path basedir = files.makeQualified(new Path("target",
|
final Path basedir = files.makeQualified(new Path("target",
|
||||||
TestFSDownload.class.getSimpleName()));
|
TestFSDownload.class.getSimpleName()));
|
||||||
|
|
Loading…
Reference in New Issue