MAPREDUCE-2349. Modified FileInputFormat to be able to issue file and block location calls in parallel. Contributed by Siddharth Seth.

svn merge --ignore-ancestry -c 1579515 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1579516 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-03-20 02:48:48 +00:00
parent af6b5b8e9a
commit 31a3a99f05
7 changed files with 877 additions and 33 deletions

View File

@ -57,6 +57,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-4052. Improved MapReduce clients to use NodeManagers' ability to
handle cross platform application submissions. (Jian He via vinodkv)
MAPREDUCE-2349. Modified FileInputFormat to be able to issue file and block
location calls in parallel. (Siddharth Seth via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -47,6 +47,9 @@ import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
/**
* A base class for file-based {@link InputFormat}.
*
@ -203,10 +206,7 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
// Whether we need to recursive look into the directory structure
boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false);
List<FileStatus> result = new ArrayList<FileStatus>();
List<IOException> errors = new ArrayList<IOException>();
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
@ -217,6 +217,41 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
}
PathFilter inputFilter = new MultiPathFilter(filters);
FileStatus[] result;
int numThreads = job
.getInt(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS,
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
Stopwatch sw = new Stopwatch().start();
if (numThreads == 1) {
List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive);
result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]);
} else {
Iterable<FileStatus> locatedFiles = null;
try {
LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
job, dirs, recursive, inputFilter, false);
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
} catch (InterruptedException e) {
throw new IOException("Interrupted while getting file statuses");
}
result = Iterables.toArray(locatedFiles, FileStatus.class);
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
}
LOG.info("Total input paths to process : " + result.length);
return result;
}
private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs,
PathFilter inputFilter, boolean recursive) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
List<IOException> errors = new ArrayList<IOException>();
for (Path p: dirs) {
FileSystem fs = p.getFileSystem(job);
FileStatus[] matches = fs.globStatus(p, inputFilter);
@ -246,12 +281,10 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
}
}
}
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
LOG.info("Total input paths to process : " + result.size());
return result.toArray(new FileStatus[result.size()]);
return result;
}
/**
@ -267,6 +300,7 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
* they're too big.*/
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
Stopwatch sw = new Stopwatch().start();
FileStatus[] files = listStatus(job);
// Save the number of input files for metrics/loadgen
@ -325,7 +359,11 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
LOG.debug("Total # of splits: " + splits.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
return splits.toArray(new FileSplit[splits.size()]);
}

View File

@ -0,0 +1,371 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Utility class to fetch block locations for specified Input paths using a
* configured number of threads.
*/
@Private
public class LocatedFileStatusFetcher {
private final Path[] inputDirs;
private final PathFilter inputFilter;
private final Configuration conf;
private final boolean recursive;
private final boolean newApi;
private final ExecutorService rawExec;
private final ListeningExecutorService exec;
private final BlockingQueue<List<FileStatus>> resultQueue;
private final List<IOException> invalidInputErrors = new LinkedList<IOException>();
private final ProcessInitialInputPathCallback processInitialInputPathCallback =
new ProcessInitialInputPathCallback();
private final ProcessInputDirCallback processInputDirCallback =
new ProcessInputDirCallback();
private final AtomicInteger runningTasks = new AtomicInteger(0);
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private volatile Throwable unknownError;
/**
* @param conf configuration for the job
* @param dirs the initial list of paths
* @param recursive whether to traverse the patchs recursively
* @param inputFilter inputFilter to apply to the resulting paths
* @param newApi whether using the mapred or mapreduce API
* @throws InterruptedException
* @throws IOException
*/
public LocatedFileStatusFetcher(Configuration conf, Path[] dirs,
boolean recursive, PathFilter inputFilter, boolean newApi) throws InterruptedException,
IOException {
int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS,
FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
rawExec = Executors.newFixedThreadPool(
numThreads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("GetFileInfo #%d").build());
exec = MoreExecutors.listeningDecorator(rawExec);
resultQueue = new LinkedBlockingQueue<List<FileStatus>>();
this.conf = conf;
this.inputDirs = dirs;
this.recursive = recursive;
this.inputFilter = inputFilter;
this.newApi = newApi;
}
/**
* Start executing and return FileStatuses based on the parameters specified
* @return fetched file statuses
* @throws InterruptedException
* @throws IOException
*/
public Iterable<FileStatus> getFileStatuses() throws InterruptedException,
IOException {
// Increment to make sure a race between the first thread completing and the
// rest being scheduled does not lead to a termination.
runningTasks.incrementAndGet();
for (Path p : inputDirs) {
runningTasks.incrementAndGet();
ListenableFuture<ProcessInitialInputPathCallable.Result> future = exec
.submit(new ProcessInitialInputPathCallable(p, conf, inputFilter));
Futures.addCallback(future, processInitialInputPathCallback);
}
runningTasks.decrementAndGet();
lock.lock();
try {
while (runningTasks.get() != 0 && unknownError == null) {
condition.await();
}
} finally {
lock.unlock();
}
this.exec.shutdownNow();
if (this.unknownError != null) {
if (this.unknownError instanceof Error) {
throw (Error) this.unknownError;
} else if (this.unknownError instanceof RuntimeException) {
throw (RuntimeException) this.unknownError;
} else if (this.unknownError instanceof IOException) {
throw (IOException) this.unknownError;
} else if (this.unknownError instanceof InterruptedException) {
throw (InterruptedException) this.unknownError;
} else {
throw new IOException(this.unknownError);
}
}
if (this.invalidInputErrors.size() != 0) {
if (this.newApi) {
throw new org.apache.hadoop.mapreduce.lib.input.InvalidInputException(
invalidInputErrors);
} else {
throw new InvalidInputException(invalidInputErrors);
}
}
return Iterables.concat(resultQueue);
}
/**
* Collect misconfigured Input errors. Errors while actually reading file info
* are reported immediately
*/
private void registerInvalidInputError(List<IOException> errors) {
synchronized (this) {
this.invalidInputErrors.addAll(errors);
}
}
/**
* Register fatal errors - example an IOException while accessing a file or a
* full exection queue
*/
private void registerError(Throwable t) {
lock.lock();
try {
if (unknownError != null) {
unknownError = t;
condition.signal();
}
} finally {
lock.unlock();
}
}
private void decrementRunningAndCheckCompletion() {
lock.lock();
try {
if (runningTasks.decrementAndGet() == 0) {
condition.signal();
}
} finally {
lock.unlock();
}
}
/**
* Retrieves block locations for the given @link {@link FileStatus}, and adds
* additional paths to the process queue if required.
*/
private static class ProcessInputDirCallable implements
Callable<ProcessInputDirCallable.Result> {
private final FileSystem fs;
private final FileStatus fileStatus;
private final boolean recursive;
private final PathFilter inputFilter;
ProcessInputDirCallable(FileSystem fs, FileStatus fileStatus,
boolean recursive, PathFilter inputFilter) {
this.fs = fs;
this.fileStatus = fileStatus;
this.recursive = recursive;
this.inputFilter = inputFilter;
}
@Override
public Result call() throws Exception {
Result result = new Result();
result.fs = fs;
if (fileStatus.isDirectory()) {
RemoteIterator<LocatedFileStatus> iter = fs
.listLocatedStatus(fileStatus.getPath());
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (recursive && stat.isDirectory()) {
result.dirsNeedingRecursiveCalls.add(stat);
} else {
result.locatedFileStatuses.add(stat);
}
}
}
} else {
result.locatedFileStatuses.add(fileStatus);
}
return result;
}
private static class Result {
private List<FileStatus> locatedFileStatuses = new LinkedList<FileStatus>();
private List<FileStatus> dirsNeedingRecursiveCalls = new LinkedList<FileStatus>();
private FileSystem fs;
}
}
/**
* The callback handler to handle results generated by
* {@link ProcessInputDirCallable}. This populates the final result set.
*
*/
private class ProcessInputDirCallback implements
FutureCallback<ProcessInputDirCallable.Result> {
@Override
public void onSuccess(ProcessInputDirCallable.Result result) {
try {
if (result.locatedFileStatuses.size() != 0) {
resultQueue.add(result.locatedFileStatuses);
}
if (result.dirsNeedingRecursiveCalls.size() != 0) {
for (FileStatus fileStatus : result.dirsNeedingRecursiveCalls) {
runningTasks.incrementAndGet();
ListenableFuture<ProcessInputDirCallable.Result> future = exec
.submit(new ProcessInputDirCallable(result.fs, fileStatus,
recursive, inputFilter));
Futures.addCallback(future, processInputDirCallback);
}
}
decrementRunningAndCheckCompletion();
} catch (Throwable t) { // Error within the callback itself.
registerError(t);
}
}
@Override
public void onFailure(Throwable t) {
// Any generated exceptions. Leads to immediate termination.
registerError(t);
}
}
/**
* Processes an initial Input Path pattern through the globber and PathFilter
* to generate a list of files which need further processing.
*/
private static class ProcessInitialInputPathCallable implements
Callable<ProcessInitialInputPathCallable.Result> {
private final Path path;
private final Configuration conf;
private final PathFilter inputFilter;
public ProcessInitialInputPathCallable(Path path, Configuration conf,
PathFilter pathFilter) {
this.path = path;
this.conf = conf;
this.inputFilter = pathFilter;
}
@Override
public Result call() throws Exception {
Result result = new Result();
FileSystem fs = path.getFileSystem(conf);
result.fs = fs;
FileStatus[] matches = fs.globStatus(path, inputFilter);
if (matches == null) {
result.addError(new IOException("Input path does not exist: " + path));
} else if (matches.length == 0) {
result.addError(new IOException("Input Pattern " + path
+ " matches 0 files"));
} else {
result.matchedFileStatuses = matches;
}
return result;
}
private static class Result {
private List<IOException> errors;
private FileStatus[] matchedFileStatuses;
private FileSystem fs;
void addError(IOException ioe) {
if (errors == null) {
errors = new LinkedList<IOException>();
}
errors.add(ioe);
}
}
}
/**
* The callback handler to handle results generated by
* {@link ProcessInitialInputPathCallable}
*
*/
private class ProcessInitialInputPathCallback implements
FutureCallback<ProcessInitialInputPathCallable.Result> {
@Override
public void onSuccess(ProcessInitialInputPathCallable.Result result) {
try {
if (result.errors != null) {
registerInvalidInputError(result.errors);
}
if (result.matchedFileStatuses != null) {
for (FileStatus matched : result.matchedFileStatuses) {
runningTasks.incrementAndGet();
ListenableFuture<ProcessInputDirCallable.Result> future = exec
.submit(new ProcessInputDirCallable(result.fs, matched,
recursive, inputFilter));
Futures.addCallback(future, processInputDirCallback);
}
}
decrementRunningAndCheckCompletion();
} catch (Throwable t) { // Exception within the callback
registerError(t);
}
}
@Override
public void onFailure(Throwable t) {
// Any generated exceptions. Leads to immediate termination.
registerError(t);
}
}
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@ -43,6 +44,9 @@ import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
/**
* A base class for file-based {@link InputFormat}s.
*
@ -68,6 +72,9 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
"mapreduce.input.fileinputformat.numinputfiles";
public static final String INPUT_DIR_RECURSIVE =
"mapreduce.input.fileinputformat.input.dir.recursive";
public static final String LIST_STATUS_NUM_THREADS =
"mapreduce.input.fileinputformat.list-status.num-threads";
public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
@ -225,7 +232,6 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
*/
protected List<FileStatus> listStatus(JobContext job
) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
@ -237,9 +243,7 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
// Whether we need to recursive look into the directory structure
boolean recursive = getInputDirRecursive(job);
List<IOException> errors = new ArrayList<IOException>();
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
@ -250,6 +254,37 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
}
PathFilter inputFilter = new MultiPathFilter(filters);
List<FileStatus> result = null;
int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
DEFAULT_LIST_STATUS_NUM_THREADS);
Stopwatch sw = new Stopwatch().start();
if (numThreads == 1) {
result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
} else {
Iterable<FileStatus> locatedFiles = null;
try {
LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
job.getConfiguration(), dirs, recursive, inputFilter, true);
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
} catch (InterruptedException e) {
throw new IOException("Interrupted while getting file statuses");
}
result = Lists.newArrayList(locatedFiles);
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
}
LOG.info("Total input paths to process : " + result.size());
return result;
}
private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs,
PathFilter inputFilter, boolean recursive) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
List<IOException> errors = new ArrayList<IOException>();
for (int i=0; i < dirs.length; ++i) {
Path p = dirs[i];
FileSystem fs = p.getFileSystem(job.getConfiguration());
@ -284,7 +319,6 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
LOG.info("Total input paths to process : " + result.size());
return result;
}
@ -332,6 +366,7 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
* @throws IOException
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
@ -376,7 +411,11 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
LOG.debug("Total # of splits: " + splits.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
return splits;
}

View File

@ -694,6 +694,15 @@
take priority over this setting.</description>
</property>
<property>
<name>mapreduce.input.fileinputformat.list-status.num-threads</name>
<value>1</value>
<description>The number of threads to use to list and fetch block locations
for the specified input paths. Note: multiple threads should not be used
if a custom non thread-safe path filter is used.
</description>
</property>
<property>
<name>mapreduce.jobtracker.maxtasks.perjob</name>
<value>-1</value>

View File

@ -19,7 +19,12 @@ package org.apache.hadoop.mapred;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
@ -29,15 +34,58 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Lists;
@RunWith(value = Parameterized.class)
public class TestFileInputFormat {
private static final Log LOG = LogFactory.getLog(TestFileInputFormat.class);
private static String testTmpDir = System.getProperty("test.build.data", "/tmp");
private static final Path TEST_ROOT_DIR = new Path(testTmpDir, "TestFIF");
private static FileSystem localFs;
private int numThreads;
public TestFileInputFormat(int numThreads) {
this.numThreads = numThreads;
LOG.info("Running with numThreads: " + numThreads);
}
@Parameters
public static Collection<Object[]> data() {
Object[][] data = new Object[][] { { 1 }, { 5 }};
return Arrays.asList(data);
}
@Before
public void setup() throws IOException {
LOG.info("Using Test Dir: " + TEST_ROOT_DIR);
localFs = FileSystem.getLocal(new Configuration());
localFs.delete(TEST_ROOT_DIR, true);
localFs.mkdirs(TEST_ROOT_DIR);
}
@After
public void cleanup() throws IOException {
localFs.delete(TEST_ROOT_DIR, true);
}
@Test
public void testListLocatedStatus() throws Exception {
Configuration conf = getConfiguration();
conf.setBoolean("fs.test.impl.disable.cache", false);
conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
"test:///a1/a2");
MockFileSystem mockFs =
@ -51,6 +99,82 @@ public class TestFileInputFormat {
Assert.assertEquals("Input splits are not correct", 2, splits.length);
Assert.assertEquals("listLocatedStatuss calls",
1, mockFs.numListLocatedStatusCalls);
FileSystem.closeAll();
}
@Test
public void testListStatusSimple() throws IOException {
Configuration conf = new Configuration();
conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
List<Path> expectedPaths = org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
.configureTestSimple(conf, localFs);
JobConf jobConf = new JobConf(conf);
TextInputFormat fif = new TextInputFormat();
fif.configure(jobConf);
FileStatus[] statuses = fif.listStatus(jobConf);
org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
.verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses),
localFs);
}
@Test
public void testListStatusNestedRecursive() throws IOException {
Configuration conf = new Configuration();
conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
List<Path> expectedPaths = org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
.configureTestNestedRecursive(conf, localFs);
JobConf jobConf = new JobConf(conf);
TextInputFormat fif = new TextInputFormat();
fif.configure(jobConf);
FileStatus[] statuses = fif.listStatus(jobConf);
org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
.verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses),
localFs);
}
@Test
public void testListStatusNestedNonRecursive() throws IOException {
Configuration conf = new Configuration();
conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
List<Path> expectedPaths = org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
.configureTestNestedNonRecursive(conf, localFs);
JobConf jobConf = new JobConf(conf);
TextInputFormat fif = new TextInputFormat();
fif.configure(jobConf);
FileStatus[] statuses = fif.listStatus(jobConf);
org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
.verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses),
localFs);
}
@Test
public void testListStatusErrorOnNonExistantDir() throws IOException {
Configuration conf = new Configuration();
conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
.configureTestErrorOnNonExistantDir(conf, localFs);
JobConf jobConf = new JobConf(conf);
TextInputFormat fif = new TextInputFormat();
fif.configure(jobConf);
try {
fif.listStatus(jobConf);
Assert.fail("Expecting an IOException for a missing Input path");
} catch (IOException e) {
Path expectedExceptionPath = new Path(TEST_ROOT_DIR, "input2");
expectedExceptionPath = localFs.makeQualified(expectedExceptionPath);
Assert.assertTrue(e instanceof InvalidInputException);
Assert.assertEquals(
"Input path does not exist: " + expectedExceptionPath.toString(),
e.getMessage());
}
}
private Configuration getConfiguration() {

View File

@ -19,10 +19,17 @@ package org.apache.hadoop.mapreduce.lib.input;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
@ -34,55 +41,90 @@ import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@RunWith(value = Parameterized.class)
public class TestFileInputFormat {
private static final Log LOG = LogFactory.getLog(TestFileInputFormat.class);
private static String testTmpDir = System.getProperty("test.build.data", "/tmp");
private static final Path TEST_ROOT_DIR = new Path(testTmpDir, "TestFIF");
private static FileSystem localFs;
private int numThreads;
public TestFileInputFormat(int numThreads) {
this.numThreads = numThreads;
LOG.info("Running with numThreads: " + numThreads);
}
@Parameters
public static Collection<Object[]> data() {
Object[][] data = new Object[][] { { 1 }, { 5 }};
return Arrays.asList(data);
}
@Before
public void setup() throws IOException {
LOG.info("Using Test Dir: " + TEST_ROOT_DIR);
localFs = FileSystem.getLocal(new Configuration());
localFs.delete(TEST_ROOT_DIR, true);
localFs.mkdirs(TEST_ROOT_DIR);
}
@After
public void cleanup() throws IOException {
localFs.delete(TEST_ROOT_DIR, true);
}
@Test
public void testNumInputFilesRecursively() throws Exception {
Configuration conf = getConfiguration();
conf.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
Job job = Job.getInstance(conf);
FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
List<InputSplit> splits = fileInputFormat.getSplits(job);
Assert.assertEquals("Input splits are not correct", 3, splits.size());
Assert.assertEquals("test:/a1/a2/file2", ((FileSplit) splits.get(0))
.getPath().toString());
Assert.assertEquals("test:/a1/a2/file3", ((FileSplit) splits.get(1))
.getPath().toString());
Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(2)).getPath()
.toString());
verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3",
"test:/a1/file1"), splits);
// Using the deprecated configuration
conf = getConfiguration();
conf.set("mapred.input.dir.recursive", "true");
job = Job.getInstance(conf);
splits = fileInputFormat.getSplits(job);
Assert.assertEquals("Input splits are not correct", 3, splits.size());
Assert.assertEquals("test:/a1/a2/file2", ((FileSplit) splits.get(0))
.getPath().toString());
Assert.assertEquals("test:/a1/a2/file3", ((FileSplit) splits.get(1))
.getPath().toString());
Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(2)).getPath()
.toString());
verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3",
"test:/a1/file1"), splits);
}
@Test
public void testNumInputFilesWithoutRecursively() throws Exception {
Configuration conf = getConfiguration();
conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
Job job = Job.getInstance(conf);
FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
List<InputSplit> splits = fileInputFormat.getSplits(job);
Assert.assertEquals("Input splits are not correct", 2, splits.size());
Assert.assertEquals("test:/a1/a2", ((FileSplit) splits.get(0)).getPath()
.toString());
Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(1)).getPath()
.toString());
verifySplits(Lists.newArrayList("test:/a1/a2", "test:/a1/file1"), splits);
}
@Test
public void testListLocatedStatus() throws Exception {
Configuration conf = getConfiguration();
conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
conf.setBoolean("fs.test.impl.disable.cache", false);
conf.set(FileInputFormat.INPUT_DIR, "test:///a1/a2");
MockFileSystem mockFs =
@ -95,8 +137,226 @@ public class TestFileInputFormat {
Assert.assertEquals("Input splits are not correct", 2, splits.size());
Assert.assertEquals("listLocatedStatuss calls",
1, mockFs.numListLocatedStatusCalls);
FileSystem.closeAll();
}
@Test
public void testListStatusSimple() throws IOException {
Configuration conf = new Configuration();
conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
List<Path> expectedPaths = configureTestSimple(conf, localFs);
Job job = Job.getInstance(conf);
FileInputFormat<?, ?> fif = new TextInputFormat();
List<FileStatus> statuses = fif.listStatus(job);
verifyFileStatuses(expectedPaths, statuses, localFs);
}
@Test
public void testListStatusNestedRecursive() throws IOException {
Configuration conf = new Configuration();
conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
List<Path> expectedPaths = configureTestNestedRecursive(conf, localFs);
Job job = Job.getInstance(conf);
FileInputFormat<?, ?> fif = new TextInputFormat();
List<FileStatus> statuses = fif.listStatus(job);
verifyFileStatuses(expectedPaths, statuses, localFs);
}
@Test
public void testListStatusNestedNonRecursive() throws IOException {
Configuration conf = new Configuration();
conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
List<Path> expectedPaths = configureTestNestedNonRecursive(conf, localFs);
Job job = Job.getInstance(conf);
FileInputFormat<?, ?> fif = new TextInputFormat();
List<FileStatus> statuses = fif.listStatus(job);
verifyFileStatuses(expectedPaths, statuses, localFs);
}
@Test
public void testListStatusErrorOnNonExistantDir() throws IOException {
Configuration conf = new Configuration();
conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
configureTestErrorOnNonExistantDir(conf, localFs);
Job job = Job.getInstance(conf);
FileInputFormat<?, ?> fif = new TextInputFormat();
try {
fif.listStatus(job);
Assert.fail("Expecting an IOException for a missing Input path");
} catch (IOException e) {
Path expectedExceptionPath = new Path(TEST_ROOT_DIR, "input2");
expectedExceptionPath = localFs.makeQualified(expectedExceptionPath);
Assert.assertTrue(e instanceof InvalidInputException);
Assert.assertEquals(
"Input path does not exist: " + expectedExceptionPath.toString(),
e.getMessage());
}
}
public static List<Path> configureTestSimple(Configuration conf, FileSystem localFs)
throws IOException {
Path base1 = new Path(TEST_ROOT_DIR, "input1");
Path base2 = new Path(TEST_ROOT_DIR, "input2");
conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
localFs.makeQualified(base1) + "," + localFs.makeQualified(base2));
localFs.mkdirs(base1);
localFs.mkdirs(base2);
Path in1File1 = new Path(base1, "file1");
Path in1File2 = new Path(base1, "file2");
localFs.createNewFile(in1File1);
localFs.createNewFile(in1File2);
Path in2File1 = new Path(base2, "file1");
Path in2File2 = new Path(base2, "file2");
localFs.createNewFile(in2File1);
localFs.createNewFile(in2File2);
List<Path> expectedPaths = Lists.newArrayList(in1File1, in1File2, in2File1,
in2File2);
return expectedPaths;
}
public static List<Path> configureTestNestedRecursive(Configuration conf,
FileSystem localFs) throws IOException {
Path base1 = new Path(TEST_ROOT_DIR, "input1");
conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
localFs.makeQualified(base1).toString());
conf.setBoolean(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE,
true);
localFs.mkdirs(base1);
Path inDir1 = new Path(base1, "dir1");
Path inDir2 = new Path(base1, "dir2");
Path inFile1 = new Path(base1, "file1");
Path dir1File1 = new Path(inDir1, "file1");
Path dir1File2 = new Path(inDir1, "file2");
Path dir2File1 = new Path(inDir2, "file1");
Path dir2File2 = new Path(inDir2, "file2");
localFs.mkdirs(inDir1);
localFs.mkdirs(inDir2);
localFs.createNewFile(inFile1);
localFs.createNewFile(dir1File1);
localFs.createNewFile(dir1File2);
localFs.createNewFile(dir2File1);
localFs.createNewFile(dir2File2);
List<Path> expectedPaths = Lists.newArrayList(inFile1, dir1File1,
dir1File2, dir2File1, dir2File2);
return expectedPaths;
}
public static List<Path> configureTestNestedNonRecursive(Configuration conf,
FileSystem localFs) throws IOException {
Path base1 = new Path(TEST_ROOT_DIR, "input1");
conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
localFs.makeQualified(base1).toString());
conf.setBoolean(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE,
false);
localFs.mkdirs(base1);
Path inDir1 = new Path(base1, "dir1");
Path inDir2 = new Path(base1, "dir2");
Path inFile1 = new Path(base1, "file1");
Path dir1File1 = new Path(inDir1, "file1");
Path dir1File2 = new Path(inDir1, "file2");
Path dir2File1 = new Path(inDir2, "file1");
Path dir2File2 = new Path(inDir2, "file2");
localFs.mkdirs(inDir1);
localFs.mkdirs(inDir2);
localFs.createNewFile(inFile1);
localFs.createNewFile(dir1File1);
localFs.createNewFile(dir1File2);
localFs.createNewFile(dir2File1);
localFs.createNewFile(dir2File2);
List<Path> expectedPaths = Lists.newArrayList(inFile1, inDir1, inDir2);
return expectedPaths;
}
public static List<Path> configureTestErrorOnNonExistantDir(Configuration conf,
FileSystem localFs) throws IOException {
Path base1 = new Path(TEST_ROOT_DIR, "input1");
Path base2 = new Path(TEST_ROOT_DIR, "input2");
conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
localFs.makeQualified(base1) + "," + localFs.makeQualified(base2));
conf.setBoolean(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE,
true);
localFs.mkdirs(base1);
Path inFile1 = new Path(base1, "file1");
Path inFile2 = new Path(base1, "file2");
localFs.createNewFile(inFile1);
localFs.createNewFile(inFile2);
List<Path> expectedPaths = Lists.newArrayList();
return expectedPaths;
}
public static void verifyFileStatuses(List<Path> expectedPaths,
List<FileStatus> fetchedStatuses, final FileSystem localFs) {
Assert.assertEquals(expectedPaths.size(), fetchedStatuses.size());
Iterable<Path> fqExpectedPaths = Iterables.transform(expectedPaths,
new Function<Path, Path>() {
@Override
public Path apply(Path input) {
return localFs.makeQualified(input);
}
});
Set<Path> expectedPathSet = Sets.newHashSet(fqExpectedPaths);
for (FileStatus fileStatus : fetchedStatuses) {
if (!expectedPathSet.remove(localFs.makeQualified(fileStatus.getPath()))) {
Assert.fail("Found extra fetched status: " + fileStatus.getPath());
}
}
Assert.assertEquals(
"Not all expectedPaths matched: " + expectedPathSet.toString(), 0,
expectedPathSet.size());
}
private void verifySplits(List<String> expected, List<InputSplit> splits) {
Iterable<String> pathsFromSplits = Iterables.transform(splits,
new Function<InputSplit, String>() {
@Override
public String apply(@Nullable InputSplit input) {
return ((FileSplit) input).getPath().toString();
}
});
Set<String> expectedSet = Sets.newHashSet(expected);
for (String splitPathString : pathsFromSplits) {
if (!expectedSet.remove(splitPathString)) {
Assert.fail("Found extra split: " + splitPathString);
}
}
Assert.assertEquals(
"Not all expectedPaths matched: " + expectedSet.toString(), 0,
expectedSet.size());
}
private Configuration getConfiguration() {
Configuration conf = new Configuration();
conf.set("fs.test.impl.disable.cache", "true");