HADOOP-16202. Enhanced openFile(): mapreduce and YARN changes. (#2584/2)

These changes ensure that sequential files are opened with the
right read policy, and split start/end is passed in.

As well as offering opportunities for filesystem clients to
choose fetch/cache/seek policies, the settings ensure that
processing text files on an s3 bucket where the default policy
is "random" will still be processed efficiently.

This commit depends on the associated hadoop-common patch,
which must be committed first.

Contributed by Steve Loughran.

Change-Id: Ic6713fd752441cf42ebe8739d05c2293a5db9f94
This commit is contained in:
Steve Loughran 2022-04-24 17:10:34 +01:00
parent 1b4dba99b5
commit 6999acf520
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
10 changed files with 94 additions and 28 deletions

View File

@ -35,6 +35,10 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
/** /**
* Reads in history events from the JobHistoryFile and sends them out again * Reads in history events from the JobHistoryFile and sends them out again
* to be recorded. * to be recorded.
@ -118,7 +122,11 @@ public class JobHistoryCopyService extends CompositeService implements HistoryEv
fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath, fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
jobId, (applicationAttemptId.getAttemptId() - 1))); jobId, (applicationAttemptId.getAttemptId() - 1)));
LOG.info("History file is at " + historyFile); LOG.info("History file is at " + historyFile);
in = fc.open(historyFile); in = awaitFuture(
fc.openFile(historyFile)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
.build());
return in; return in;
} }

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CodecPool;
@ -41,9 +40,13 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader; import org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader;
import org.apache.hadoop.mapreduce.lib.input.SplitLineReader; import org.apache.hadoop.mapreduce.lib.input.SplitLineReader;
import org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader; import org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader;
import org.apache.hadoop.util.functional.FutureIO;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
/** /**
* Treats keys as offset in file and value as line. * Treats keys as offset in file and value as line.
*/ */
@ -109,10 +112,14 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
// open the file and seek to the start of the split // open the file and seek to the start of the split
final FutureDataInputStreamBuilder builder = final FutureDataInputStreamBuilder builder =
file.getFileSystem(job).openFile(file); file.getFileSystem(job).openFile(file);
FutureIOSupport.propagateOptions(builder, job, // the start and end of the split may be used to build
// an input strategy.
builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start)
.opt(FS_OPTION_OPENFILE_SPLIT_END, end);
FutureIO.propagateOptions(builder, job,
MRJobConfig.INPUT_FILE_OPTION_PREFIX, MRJobConfig.INPUT_FILE_OPTION_PREFIX,
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX); MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
fileIn = FutureIOSupport.awaitFuture(builder.build()); fileIn = FutureIO.awaitFuture(builder.build());
if (isCompressedInput()) { if (isCompressedInput()) {
decompressor = CodecPool.getDecompressor(codec); decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) { if (codec instanceof SplittableCompressionCodec) {

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CodecPool;
@ -40,6 +39,8 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.functional.FutureIO;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -94,10 +95,10 @@ public class FixedLengthRecordReader
// open the file // open the file
final FutureDataInputStreamBuilder builder = final FutureDataInputStreamBuilder builder =
file.getFileSystem(job).openFile(file); file.getFileSystem(job).openFile(file);
FutureIOSupport.propagateOptions(builder, job, FutureIO.propagateOptions(builder, job,
MRJobConfig.INPUT_FILE_OPTION_PREFIX, MRJobConfig.INPUT_FILE_OPTION_PREFIX,
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX); MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
fileIn = FutureIOSupport.awaitFuture(builder.build()); fileIn = FutureIO.awaitFuture(builder.build());
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null != codec) { if (null != codec) {

View File

@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CodecPool;
@ -40,9 +39,14 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.functional.FutureIO;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
/** /**
* Treats keys as offset in file and value as line. * Treats keys as offset in file and value as line.
*/ */
@ -86,10 +90,14 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
// open the file and seek to the start of the split // open the file and seek to the start of the split
final FutureDataInputStreamBuilder builder = final FutureDataInputStreamBuilder builder =
file.getFileSystem(job).openFile(file); file.getFileSystem(job).openFile(file);
FutureIOSupport.propagateOptions(builder, job, // the start and end of the split may be used to build
// an input strategy.
builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start);
builder.opt(FS_OPTION_OPENFILE_SPLIT_END, end);
FutureIO.propagateOptions(builder, job,
MRJobConfig.INPUT_FILE_OPTION_PREFIX, MRJobConfig.INPUT_FILE_OPTION_PREFIX,
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX); MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
fileIn = FutureIOSupport.awaitFuture(builder.build()); fileIn = FutureIO.awaitFuture(builder.build());
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) { if (null!=codec) {

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.InputSplit;
@ -39,6 +38,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader; import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.functional.FutureIO;
/** /**
* NLineInputFormat which splits N lines of input as one split. * NLineInputFormat which splits N lines of input as one split.
@ -99,10 +99,10 @@ public class NLineInputFormat extends FileInputFormat<LongWritable, Text> {
try { try {
final FutureDataInputStreamBuilder builder = final FutureDataInputStreamBuilder builder =
fileName.getFileSystem(conf).openFile(fileName); fileName.getFileSystem(conf).openFile(fileName);
FutureIOSupport.propagateOptions(builder, conf, FutureIO.propagateOptions(builder, conf,
MRJobConfig.INPUT_FILE_OPTION_PREFIX, MRJobConfig.INPUT_FILE_OPTION_PREFIX,
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX); MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
FSDataInputStream in = FutureIOSupport.awaitFuture(builder.build()); FSDataInputStream in = FutureIO.awaitFuture(builder.build());
lr = new LineReader(in, conf); lr = new LineReader(in, conf);
Text line = new Text(); Text line = new Text();
int numLines = 0; int numLines = 0;

View File

@ -27,6 +27,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.InputSplit;
@ -41,6 +42,12 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.QuickSort; import org.apache.hadoop.util.QuickSort;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.functional.FutureIO;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
/** /**
* An input format that reads the first 10 characters of each line as the key * An input format that reads the first 10 characters of each line as the key
@ -224,12 +231,17 @@ public class TeraInputFormat extends FileInputFormat<Text,Text> {
throws IOException, InterruptedException { throws IOException, InterruptedException {
Path p = ((FileSplit)split).getPath(); Path p = ((FileSplit)split).getPath();
FileSystem fs = p.getFileSystem(context.getConfiguration()); FileSystem fs = p.getFileSystem(context.getConfiguration());
in = fs.open(p);
long start = ((FileSplit)split).getStart(); long start = ((FileSplit)split).getStart();
// find the offset to start at a record boundary // find the offset to start at a record boundary
offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH; offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH;
in.seek(start + offset);
length = ((FileSplit)split).getLength(); length = ((FileSplit)split).getLength();
final FutureDataInputStreamBuilder builder = fs.openFile(p)
.opt(FS_OPTION_OPENFILE_SPLIT_START, start)
.opt(FS_OPTION_OPENFILE_SPLIT_END, start + length)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
in = FutureIO.awaitFuture(builder.build());
in.seek(start + offset);
} }
public void close() throws IOException { public void close() throws IOException {

View File

@ -52,7 +52,10 @@ import org.apache.hadoop.tools.util.ThrottledInputStream;
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.tools.mapred.CopyMapper.getFileAttributeSettings; import static org.apache.hadoop.tools.mapred.CopyMapper.getFileAttributeSettings;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
/** /**
* This class extends RetriableCommand to implement the copy of files, * This class extends RetriableCommand to implement the copy of files,
@ -362,7 +365,11 @@ public class RetriableFileCopyCommand extends RetriableCommand {
FileSystem fs = path.getFileSystem(conf); FileSystem fs = path.getFileSystem(conf);
float bandwidthMB = conf.getFloat(DistCpConstants.CONF_LABEL_BANDWIDTH_MB, float bandwidthMB = conf.getFloat(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
DistCpConstants.DEFAULT_BANDWIDTH_MB); DistCpConstants.DEFAULT_BANDWIDTH_MB);
FSDataInputStream in = fs.open(path); // open with sequential read, but not whole-file
FSDataInputStream in = awaitFuture(fs.openFile(path)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
.build());
return new ThrottledInputStream(in, bandwidthMB * 1024 * 1024); return new ThrottledInputStream(in, bandwidthMB * 1024 * 1024);
} }
catch (IOException e) { catch (IOException e) {

View File

@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
@ -35,6 +34,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.streaming.StreamUtil; import org.apache.hadoop.streaming.StreamUtil;
import org.apache.hadoop.util.functional.FutureIO;
/** /**
* An input format that selects a RecordReader based on a JobConf property. This * An input format that selects a RecordReader based on a JobConf property. This
@ -66,10 +66,10 @@ public class StreamInputFormat extends KeyValueTextInputFormat {
FileSystem fs = path.getFileSystem(conf); FileSystem fs = path.getFileSystem(conf);
// open the file // open the file
final FutureDataInputStreamBuilder builder = fs.openFile(path); final FutureDataInputStreamBuilder builder = fs.openFile(path);
FutureIOSupport.propagateOptions(builder, conf, FutureIO.propagateOptions(builder, conf,
MRJobConfig.INPUT_FILE_OPTION_PREFIX, MRJobConfig.INPUT_FILE_OPTION_PREFIX,
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX); MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
FSDataInputStream in = FutureIOSupport.awaitFuture(builder.build()); FSDataInputStream in = FutureIO.awaitFuture(builder.build());
// Factory dispatch based on available params.. // Factory dispatch based on available params..
Class readerClass; Class readerClass;

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
@ -77,6 +78,11 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables; import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
@Public @Public
@Evolving @Evolving
public class AggregatedLogFormat { public class AggregatedLogFormat {
@ -576,9 +582,16 @@ public class AggregatedLogFormat {
try { try {
FileContext fileContext = FileContext fileContext =
FileContext.getFileContext(remoteAppLogFile.toUri(), conf); FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
this.fsDataIStream = fileContext.open(remoteAppLogFile); FileStatus status = fileContext.getFileStatus(remoteAppLogFile);
this.fsDataIStream = awaitFuture(
fileContext.openFile(remoteAppLogFile)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
.opt(FS_OPTION_OPENFILE_LENGTH,
status.getLen()) // file length hint for object stores
.build());
reader = new TFile.Reader(this.fsDataIStream, reader = new TFile.Reader(this.fsDataIStream,
fileContext.getFileStatus(remoteAppLogFile).getLen(), conf); status.getLen(), conf);
this.scanner = reader.createScanner(); this.scanner = reader.createScanner();
} catch (IOException ioe) { } catch (IOException ioe) {
close(); close();

View File

@ -60,7 +60,11 @@ import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
/** import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
/**
* Download a single URL to the local disk. * Download a single URL to the local disk.
* *
*/ */
@ -285,23 +289,25 @@ public class FSDownload implements Callable<Path> {
} }
} }
downloadAndUnpack(sCopy, destination); downloadAndUnpack(sCopy, sStat, destination);
} }
/** /**
* Copy source path to destination with localization rules. * Copy source path to destination with localization rules.
* @param source source path to copy. Typically HDFS * @param source source path to copy. Typically HDFS or an object store.
* @param sourceStatus status of source
* @param destination destination path. Typically local filesystem * @param destination destination path. Typically local filesystem
* @exception YarnException Any error has occurred * @exception YarnException Any error has occurred
*/ */
private void downloadAndUnpack(Path source, Path destination) private void downloadAndUnpack(Path source,
FileStatus sourceStatus, Path destination)
throws YarnException { throws YarnException {
try { try {
FileSystem sourceFileSystem = source.getFileSystem(conf); FileSystem sourceFileSystem = source.getFileSystem(conf);
FileSystem destinationFileSystem = destination.getFileSystem(conf); FileSystem destinationFileSystem = destination.getFileSystem(conf);
if (sourceFileSystem.getFileStatus(source).isDirectory()) { if (sourceStatus.isDirectory()) {
FileUtil.copy( FileUtil.copy(
sourceFileSystem, source, sourceFileSystem, sourceStatus,
destinationFileSystem, destination, false, destinationFileSystem, destination, false,
true, conf); true, conf);
} else { } else {
@ -329,7 +335,11 @@ public class FSDownload implements Callable<Path> {
FileSystem sourceFileSystem, FileSystem sourceFileSystem,
FileSystem destinationFileSystem) FileSystem destinationFileSystem)
throws IOException, InterruptedException, ExecutionException { throws IOException, InterruptedException, ExecutionException {
try (InputStream inputStream = sourceFileSystem.open(source)) { try (InputStream inputStream = awaitFuture(
sourceFileSystem.openFile(source)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
.build())) {
File dst = new File(destination.toUri()); File dst = new File(destination.toUri());
String lowerDst = StringUtils.toLowerCase(dst.getName()); String lowerDst = StringUtils.toLowerCase(dst.getName());
switch (resource.getType()) { switch (resource.getType()) {