HADOOP-16458. LocatedFileStatusFetcher.getFileStatuses failing intermittently with S3
Contributed by Steve Loughran. Includes -S3A glob scans don't bother trying to resolve symlinks -stack traces don't get lost in getFileStatuses() when exceptions are wrapped -debug level logging of what is up in Globber -Contains HADOOP-13373. Add S3A implementation of FSMainOperationsBaseTest. -ITestRestrictedReadAccess tests incomplete read access to files. This adds a builder API for constructing globbers which other stores can use so that they too can skip symlink resolution when not needed. Change-Id: I23bcdb2783d6bd77cf168fdc165b1b4b334d91c7
This commit is contained in:
parent
918b470deb
commit
1921e94292
|
@ -2064,7 +2064,12 @@ public abstract class FileSystem extends Configured
|
||||||
* @throws IOException IO failure
|
* @throws IOException IO failure
|
||||||
*/
|
*/
|
||||||
public FileStatus[] globStatus(Path pathPattern) throws IOException {
|
public FileStatus[] globStatus(Path pathPattern) throws IOException {
|
||||||
return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
|
return Globber.createGlobber(this)
|
||||||
|
.withPathPattern(pathPattern)
|
||||||
|
.withPathFiltern(DEFAULT_FILTER)
|
||||||
|
.withResolveSymlinks(true)
|
||||||
|
.build()
|
||||||
|
.glob();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -25,15 +25,24 @@ import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.util.DurationInfo;
|
||||||
|
|
||||||
import org.apache.htrace.core.TraceScope;
|
import org.apache.htrace.core.TraceScope;
|
||||||
import org.apache.htrace.core.Tracer;
|
import org.apache.htrace.core.Tracer;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation of {@link FileSystem#globStatus(Path, PathFilter)}.
|
||||||
|
* This has historically been package-private; it has been opened
|
||||||
|
* up for object stores within the {@code hadoop-*} codebase ONLY.
|
||||||
|
* It could be expanded for external store implementations in future.
|
||||||
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
class Globber {
|
public class Globber {
|
||||||
public static final Logger LOG =
|
public static final Logger LOG =
|
||||||
LoggerFactory.getLogger(Globber.class.getName());
|
LoggerFactory.getLogger(Globber.class.getName());
|
||||||
|
|
||||||
|
@ -42,21 +51,62 @@ class Globber {
|
||||||
private final Path pathPattern;
|
private final Path pathPattern;
|
||||||
private final PathFilter filter;
|
private final PathFilter filter;
|
||||||
private final Tracer tracer;
|
private final Tracer tracer;
|
||||||
|
private final boolean resolveSymlinks;
|
||||||
|
|
||||||
public Globber(FileSystem fs, Path pathPattern, PathFilter filter) {
|
Globber(FileSystem fs, Path pathPattern, PathFilter filter) {
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
this.fc = null;
|
this.fc = null;
|
||||||
this.pathPattern = pathPattern;
|
this.pathPattern = pathPattern;
|
||||||
this.filter = filter;
|
this.filter = filter;
|
||||||
this.tracer = FsTracer.get(fs.getConf());
|
this.tracer = FsTracer.get(fs.getConf());
|
||||||
|
this.resolveSymlinks = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Globber(FileContext fc, Path pathPattern, PathFilter filter) {
|
Globber(FileContext fc, Path pathPattern, PathFilter filter) {
|
||||||
this.fs = null;
|
this.fs = null;
|
||||||
this.fc = fc;
|
this.fc = fc;
|
||||||
this.pathPattern = pathPattern;
|
this.pathPattern = pathPattern;
|
||||||
this.filter = filter;
|
this.filter = filter;
|
||||||
this.tracer = fc.getTracer();
|
this.tracer = fc.getTracer();
|
||||||
|
this.resolveSymlinks = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Filesystem constructor for use by {@link GlobBuilder}.
|
||||||
|
* @param fs filesystem
|
||||||
|
* @param pathPattern path pattern
|
||||||
|
* @param filter optional filter
|
||||||
|
* @param resolveSymlinks should symlinks be resolved.
|
||||||
|
*/
|
||||||
|
private Globber(FileSystem fs, Path pathPattern, PathFilter filter,
|
||||||
|
boolean resolveSymlinks) {
|
||||||
|
this.fs = fs;
|
||||||
|
this.fc = null;
|
||||||
|
this.pathPattern = pathPattern;
|
||||||
|
this.filter = filter;
|
||||||
|
this.resolveSymlinks = resolveSymlinks;
|
||||||
|
this.tracer = FsTracer.get(fs.getConf());
|
||||||
|
LOG.debug("Created Globber for path={}, symlinks={}",
|
||||||
|
pathPattern, resolveSymlinks);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* File Context constructor for use by {@link GlobBuilder}.
|
||||||
|
* @param fc file context
|
||||||
|
* @param pathPattern path pattern
|
||||||
|
* @param filter optional filter
|
||||||
|
* @param resolveSymlinks should symlinks be resolved.
|
||||||
|
*/
|
||||||
|
private Globber(FileContext fc, Path pathPattern, PathFilter filter,
|
||||||
|
boolean resolveSymlinks) {
|
||||||
|
this.fs = null;
|
||||||
|
this.fc = fc;
|
||||||
|
this.pathPattern = pathPattern;
|
||||||
|
this.filter = filter;
|
||||||
|
this.resolveSymlinks = resolveSymlinks;
|
||||||
|
this.tracer = fc.getTracer();
|
||||||
|
LOG.debug("Created Globber path={}, symlinks={}",
|
||||||
|
pathPattern, resolveSymlinks);
|
||||||
}
|
}
|
||||||
|
|
||||||
private FileStatus getFileStatus(Path path) throws IOException {
|
private FileStatus getFileStatus(Path path) throws IOException {
|
||||||
|
@ -67,6 +117,7 @@ class Globber {
|
||||||
return fc.getFileStatus(path);
|
return fc.getFileStatus(path);
|
||||||
}
|
}
|
||||||
} catch (FileNotFoundException e) {
|
} catch (FileNotFoundException e) {
|
||||||
|
LOG.debug("getFileStatus({}) failed; returning null", path, e);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -79,6 +130,7 @@ class Globber {
|
||||||
return fc.util().listStatus(path);
|
return fc.util().listStatus(path);
|
||||||
}
|
}
|
||||||
} catch (FileNotFoundException e) {
|
} catch (FileNotFoundException e) {
|
||||||
|
LOG.debug("listStatus({}) failed; returning empty array", path, e);
|
||||||
return new FileStatus[0];
|
return new FileStatus[0];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -107,7 +159,7 @@ class Globber {
|
||||||
*/
|
*/
|
||||||
private static List<String> getPathComponents(String path)
|
private static List<String> getPathComponents(String path)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
ArrayList<String> ret = new ArrayList<String>();
|
ArrayList<String> ret = new ArrayList<>();
|
||||||
for (String component : path.split(Path.SEPARATOR)) {
|
for (String component : path.split(Path.SEPARATOR)) {
|
||||||
if (!component.isEmpty()) {
|
if (!component.isEmpty()) {
|
||||||
ret.add(component);
|
ret.add(component);
|
||||||
|
@ -145,7 +197,8 @@ class Globber {
|
||||||
public FileStatus[] glob() throws IOException {
|
public FileStatus[] glob() throws IOException {
|
||||||
TraceScope scope = tracer.newScope("Globber#glob");
|
TraceScope scope = tracer.newScope("Globber#glob");
|
||||||
scope.addKVAnnotation("pattern", pathPattern.toUri().getPath());
|
scope.addKVAnnotation("pattern", pathPattern.toUri().getPath());
|
||||||
try {
|
try (DurationInfo ignored = new DurationInfo(LOG, false,
|
||||||
|
"glob %s", pathPattern)) {
|
||||||
return doGlob();
|
return doGlob();
|
||||||
} finally {
|
} finally {
|
||||||
scope.close();
|
scope.close();
|
||||||
|
@ -164,10 +217,11 @@ class Globber {
|
||||||
String pathPatternString = pathPattern.toUri().getPath();
|
String pathPatternString = pathPattern.toUri().getPath();
|
||||||
List<String> flattenedPatterns = GlobExpander.expand(pathPatternString);
|
List<String> flattenedPatterns = GlobExpander.expand(pathPatternString);
|
||||||
|
|
||||||
|
LOG.debug("Filesystem glob {}", pathPatternString);
|
||||||
// Now loop over all flattened patterns. In every case, we'll be trying to
|
// Now loop over all flattened patterns. In every case, we'll be trying to
|
||||||
// match them to entries in the filesystem.
|
// match them to entries in the filesystem.
|
||||||
ArrayList<FileStatus> results =
|
ArrayList<FileStatus> results =
|
||||||
new ArrayList<FileStatus>(flattenedPatterns.size());
|
new ArrayList<>(flattenedPatterns.size());
|
||||||
boolean sawWildcard = false;
|
boolean sawWildcard = false;
|
||||||
for (String flatPattern : flattenedPatterns) {
|
for (String flatPattern : flattenedPatterns) {
|
||||||
// Get the absolute path for this flattened pattern. We couldn't do
|
// Get the absolute path for this flattened pattern. We couldn't do
|
||||||
|
@ -175,13 +229,14 @@ class Globber {
|
||||||
// path you go down influences how the path must be made absolute.
|
// path you go down influences how the path must be made absolute.
|
||||||
Path absPattern = fixRelativePart(new Path(
|
Path absPattern = fixRelativePart(new Path(
|
||||||
flatPattern.isEmpty() ? Path.CUR_DIR : flatPattern));
|
flatPattern.isEmpty() ? Path.CUR_DIR : flatPattern));
|
||||||
|
LOG.debug("Pattern: {}", absPattern);
|
||||||
// Now we break the flattened, absolute pattern into path components.
|
// Now we break the flattened, absolute pattern into path components.
|
||||||
// For example, /a/*/c would be broken into the list [a, *, c]
|
// For example, /a/*/c would be broken into the list [a, *, c]
|
||||||
List<String> components =
|
List<String> components =
|
||||||
getPathComponents(absPattern.toUri().getPath());
|
getPathComponents(absPattern.toUri().getPath());
|
||||||
// Starting out at the root of the filesystem, we try to match
|
// Starting out at the root of the filesystem, we try to match
|
||||||
// filesystem entries against pattern components.
|
// filesystem entries against pattern components.
|
||||||
ArrayList<FileStatus> candidates = new ArrayList<FileStatus>(1);
|
ArrayList<FileStatus> candidates = new ArrayList<>(1);
|
||||||
// To get the "real" FileStatus of root, we'd have to do an expensive
|
// To get the "real" FileStatus of root, we'd have to do an expensive
|
||||||
// RPC to the NameNode. So we create a placeholder FileStatus which has
|
// RPC to the NameNode. So we create a placeholder FileStatus which has
|
||||||
// the correct path, but defaults for the rest of the information.
|
// the correct path, but defaults for the rest of the information.
|
||||||
|
@ -206,12 +261,13 @@ class Globber {
|
||||||
for (int componentIdx = 0; componentIdx < components.size();
|
for (int componentIdx = 0; componentIdx < components.size();
|
||||||
componentIdx++) {
|
componentIdx++) {
|
||||||
ArrayList<FileStatus> newCandidates =
|
ArrayList<FileStatus> newCandidates =
|
||||||
new ArrayList<FileStatus>(candidates.size());
|
new ArrayList<>(candidates.size());
|
||||||
GlobFilter globFilter = new GlobFilter(components.get(componentIdx));
|
GlobFilter globFilter = new GlobFilter(components.get(componentIdx));
|
||||||
String component = unescapePathComponent(components.get(componentIdx));
|
String component = unescapePathComponent(components.get(componentIdx));
|
||||||
if (globFilter.hasPattern()) {
|
if (globFilter.hasPattern()) {
|
||||||
sawWildcard = true;
|
sawWildcard = true;
|
||||||
}
|
}
|
||||||
|
LOG.debug("Component {}, patterned={}", component, sawWildcard);
|
||||||
if (candidates.isEmpty() && sawWildcard) {
|
if (candidates.isEmpty() && sawWildcard) {
|
||||||
// Optimization: if there are no more candidates left, stop examining
|
// Optimization: if there are no more candidates left, stop examining
|
||||||
// the path components. We can only do this if we've already seen
|
// the path components. We can only do this if we've already seen
|
||||||
|
@ -245,6 +301,9 @@ class Globber {
|
||||||
// incorrectly conclude that /a/b was a file and should not match
|
// incorrectly conclude that /a/b was a file and should not match
|
||||||
// /a/*/*. So we use getFileStatus of the path we just listed to
|
// /a/*/*. So we use getFileStatus of the path we just listed to
|
||||||
// disambiguate.
|
// disambiguate.
|
||||||
|
if (resolveSymlinks) {
|
||||||
|
LOG.debug("listStatus found one entry; disambiguating {}",
|
||||||
|
children[0]);
|
||||||
Path path = candidate.getPath();
|
Path path = candidate.getPath();
|
||||||
FileStatus status = getFileStatus(path);
|
FileStatus status = getFileStatus(path);
|
||||||
if (status == null) {
|
if (status == null) {
|
||||||
|
@ -257,8 +316,17 @@ class Globber {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (!status.isDirectory()) {
|
if (!status.isDirectory()) {
|
||||||
|
LOG.debug("Resolved entry is a file; skipping: {}", status);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// there's no symlinks in this store, so no need to issue
|
||||||
|
// another call, just see if the result is a directory or a file
|
||||||
|
if (children[0].getPath().equals(candidate.getPath())) {
|
||||||
|
// the listing status is of a file
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for (FileStatus child : children) {
|
for (FileStatus child : children) {
|
||||||
if (componentIdx < components.size() - 1) {
|
if (componentIdx < components.size() - 1) {
|
||||||
|
@ -312,6 +380,8 @@ class Globber {
|
||||||
*/
|
*/
|
||||||
if ((!sawWildcard) && results.isEmpty() &&
|
if ((!sawWildcard) && results.isEmpty() &&
|
||||||
(flattenedPatterns.size() <= 1)) {
|
(flattenedPatterns.size() <= 1)) {
|
||||||
|
LOG.debug("No matches found and there was no wildcard in the path {}",
|
||||||
|
pathPattern);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
/*
|
/*
|
||||||
|
@ -324,4 +394,98 @@ class Globber {
|
||||||
Arrays.sort(ret);
|
Arrays.sort(ret);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a builder for a Globber, bonded to the specific filesystem.
|
||||||
|
* @param filesystem filesystem
|
||||||
|
* @return the builder to finish configuring.
|
||||||
|
*/
|
||||||
|
public static GlobBuilder createGlobber(FileSystem filesystem) {
|
||||||
|
return new GlobBuilder(filesystem);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a builder for a Globber, bonded to the specific file
|
||||||
|
* context.
|
||||||
|
* @param fileContext file context.
|
||||||
|
* @return the builder to finish configuring.
|
||||||
|
*/
|
||||||
|
public static GlobBuilder createGlobber(FileContext fileContext) {
|
||||||
|
return new GlobBuilder(fileContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Builder for Globber instances.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public static class GlobBuilder {
|
||||||
|
|
||||||
|
private final FileSystem fs;
|
||||||
|
|
||||||
|
private final FileContext fc;
|
||||||
|
|
||||||
|
private Path pathPattern;
|
||||||
|
|
||||||
|
private PathFilter filter;
|
||||||
|
|
||||||
|
private boolean resolveSymlinks = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct bonded to a file context.
|
||||||
|
* @param fc file context.
|
||||||
|
*/
|
||||||
|
public GlobBuilder(final FileContext fc) {
|
||||||
|
this.fs = null;
|
||||||
|
this.fc = checkNotNull(fc);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct bonded to a filesystem.
|
||||||
|
* @param fs file system.
|
||||||
|
*/
|
||||||
|
public GlobBuilder(final FileSystem fs) {
|
||||||
|
this.fs = checkNotNull(fs);
|
||||||
|
this.fc = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the path pattern.
|
||||||
|
* @param pattern pattern to use.
|
||||||
|
* @return the builder
|
||||||
|
*/
|
||||||
|
public GlobBuilder withPathPattern(Path pattern) {
|
||||||
|
pathPattern = pattern;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the path filter.
|
||||||
|
* @param pathFilter filter
|
||||||
|
* @return the builder
|
||||||
|
*/
|
||||||
|
public GlobBuilder withPathFiltern(PathFilter pathFilter) {
|
||||||
|
filter = pathFilter;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the symlink resolution policy.
|
||||||
|
* @param resolve resolution flag.
|
||||||
|
* @return the builder
|
||||||
|
*/
|
||||||
|
public GlobBuilder withResolveSymlinks(boolean resolve) {
|
||||||
|
resolveSymlinks = resolve;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build the Globber.
|
||||||
|
* @return a new instance.
|
||||||
|
*/
|
||||||
|
public Globber build() {
|
||||||
|
return fs != null
|
||||||
|
? new Globber(fs, pathPattern, filter, resolveSymlinks)
|
||||||
|
: new Globber(fc, pathPattern, filter, resolveSymlinks);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -575,6 +575,9 @@ public final class LambdaTestUtils {
|
||||||
if (o == null) {
|
if (o == null) {
|
||||||
return NULL_RESULT;
|
return NULL_RESULT;
|
||||||
} else {
|
} else {
|
||||||
|
if (o instanceof String) {
|
||||||
|
return '"' + (String)o + '"';
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
return o.toString();
|
return o.toString();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InterruptedIOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
@ -250,7 +251,9 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
||||||
job, dirs, recursive, inputFilter, false);
|
job, dirs, recursive, inputFilter, false);
|
||||||
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
|
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new IOException("Interrupted while getting file statuses");
|
throw (IOException)
|
||||||
|
new InterruptedIOException("Interrupted while getting file statuses")
|
||||||
|
.initCause(e);
|
||||||
}
|
}
|
||||||
result = Iterables.toArray(locatedFiles, FileStatus.class);
|
result = Iterables.toArray(locatedFiles, FileStatus.class);
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,10 +38,14 @@ public class InvalidInputException extends IOException {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create the exception with the given list.
|
* Create the exception with the given list.
|
||||||
|
* The first element of the list is used as the init cause value.
|
||||||
* @param probs the list of problems to report. this list is not copied.
|
* @param probs the list of problems to report. this list is not copied.
|
||||||
*/
|
*/
|
||||||
public InvalidInputException(List<IOException> probs) {
|
public InvalidInputException(List<IOException> probs) {
|
||||||
problems = probs;
|
problems = probs;
|
||||||
|
if (!probs.isEmpty()) {
|
||||||
|
initCause(probs.get(0));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -46,15 +46,23 @@ import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility class to fetch block locations for specified Input paths using a
|
* Utility class to fetch block locations for specified Input paths using a
|
||||||
* configured number of threads.
|
* configured number of threads.
|
||||||
|
* The thread count is determined from the value of
|
||||||
|
* "mapreduce.input.fileinputformat.list-status.num-threads" in the
|
||||||
|
* configuration.
|
||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
public class LocatedFileStatusFetcher {
|
public class LocatedFileStatusFetcher {
|
||||||
|
|
||||||
|
public static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(LocatedFileStatusFetcher.class.getName());
|
||||||
private final Path[] inputDirs;
|
private final Path[] inputDirs;
|
||||||
private final PathFilter inputFilter;
|
private final PathFilter inputFilter;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
|
@ -64,7 +72,7 @@ public class LocatedFileStatusFetcher {
|
||||||
private final ExecutorService rawExec;
|
private final ExecutorService rawExec;
|
||||||
private final ListeningExecutorService exec;
|
private final ListeningExecutorService exec;
|
||||||
private final BlockingQueue<List<FileStatus>> resultQueue;
|
private final BlockingQueue<List<FileStatus>> resultQueue;
|
||||||
private final List<IOException> invalidInputErrors = new LinkedList<IOException>();
|
private final List<IOException> invalidInputErrors = new LinkedList<>();
|
||||||
|
|
||||||
private final ProcessInitialInputPathCallback processInitialInputPathCallback =
|
private final ProcessInitialInputPathCallback processInitialInputPathCallback =
|
||||||
new ProcessInitialInputPathCallback();
|
new ProcessInitialInputPathCallback();
|
||||||
|
@ -79,25 +87,30 @@ public class LocatedFileStatusFetcher {
|
||||||
private volatile Throwable unknownError;
|
private volatile Throwable unknownError;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Instantiate.
|
||||||
|
* The newApi switch is only used to configure what exception is raised
|
||||||
|
* on failure of {@link #getFileStatuses()}, it does not change the algorithm.
|
||||||
* @param conf configuration for the job
|
* @param conf configuration for the job
|
||||||
* @param dirs the initial list of paths
|
* @param dirs the initial list of paths
|
||||||
* @param recursive whether to traverse the patchs recursively
|
* @param recursive whether to traverse the paths recursively
|
||||||
* @param inputFilter inputFilter to apply to the resulting paths
|
* @param inputFilter inputFilter to apply to the resulting paths
|
||||||
* @param newApi whether using the mapred or mapreduce API
|
* @param newApi whether using the mapred or mapreduce API
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public LocatedFileStatusFetcher(Configuration conf, Path[] dirs,
|
public LocatedFileStatusFetcher(Configuration conf, Path[] dirs,
|
||||||
boolean recursive, PathFilter inputFilter, boolean newApi) throws InterruptedException,
|
boolean recursive, PathFilter inputFilter, boolean newApi)
|
||||||
IOException {
|
throws InterruptedException, IOException {
|
||||||
int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS,
|
int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS,
|
||||||
FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
|
FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
|
||||||
|
LOG.debug("Instantiated LocatedFileStatusFetcher with {} threads",
|
||||||
|
numThreads);
|
||||||
rawExec = HadoopExecutors.newFixedThreadPool(
|
rawExec = HadoopExecutors.newFixedThreadPool(
|
||||||
numThreads,
|
numThreads,
|
||||||
new ThreadFactoryBuilder().setDaemon(true)
|
new ThreadFactoryBuilder().setDaemon(true)
|
||||||
.setNameFormat("GetFileInfo #%d").build());
|
.setNameFormat("GetFileInfo #%d").build());
|
||||||
exec = MoreExecutors.listeningDecorator(rawExec);
|
exec = MoreExecutors.listeningDecorator(rawExec);
|
||||||
resultQueue = new LinkedBlockingQueue<List<FileStatus>>();
|
resultQueue = new LinkedBlockingQueue<>();
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.inputDirs = dirs;
|
this.inputDirs = dirs;
|
||||||
this.recursive = recursive;
|
this.recursive = recursive;
|
||||||
|
@ -106,10 +119,13 @@ public class LocatedFileStatusFetcher {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start executing and return FileStatuses based on the parameters specified
|
* Start executing and return FileStatuses based on the parameters specified.
|
||||||
* @return fetched file statuses
|
* @return fetched file statuses
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException interruption waiting for results.
|
||||||
* @throws IOException
|
* @throws IOException IO failure or other error.
|
||||||
|
* @throws InvalidInputException on an invalid input and the old API
|
||||||
|
* @throws org.apache.hadoop.mapreduce.lib.input.InvalidInputException on an
|
||||||
|
* invalid input and the new API.
|
||||||
*/
|
*/
|
||||||
public Iterable<FileStatus> getFileStatuses() throws InterruptedException,
|
public Iterable<FileStatus> getFileStatuses() throws InterruptedException,
|
||||||
IOException {
|
IOException {
|
||||||
|
@ -117,6 +133,7 @@ public class LocatedFileStatusFetcher {
|
||||||
// rest being scheduled does not lead to a termination.
|
// rest being scheduled does not lead to a termination.
|
||||||
runningTasks.incrementAndGet();
|
runningTasks.incrementAndGet();
|
||||||
for (Path p : inputDirs) {
|
for (Path p : inputDirs) {
|
||||||
|
LOG.debug("Queuing scan of directory {}", p);
|
||||||
runningTasks.incrementAndGet();
|
runningTasks.incrementAndGet();
|
||||||
ListenableFuture<ProcessInitialInputPathCallable.Result> future = exec
|
ListenableFuture<ProcessInitialInputPathCallable.Result> future = exec
|
||||||
.submit(new ProcessInitialInputPathCallable(p, conf, inputFilter));
|
.submit(new ProcessInitialInputPathCallable(p, conf, inputFilter));
|
||||||
|
@ -128,14 +145,20 @@ public class LocatedFileStatusFetcher {
|
||||||
|
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
|
LOG.debug("Waiting scan completion");
|
||||||
while (runningTasks.get() != 0 && unknownError == null) {
|
while (runningTasks.get() != 0 && unknownError == null) {
|
||||||
condition.await();
|
condition.await();
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
|
// either the scan completed or an error was raised.
|
||||||
|
// in the case of an error shutting down the executor will interrupt all
|
||||||
|
// active threads, which can add noise to the logs.
|
||||||
|
LOG.debug("Scan complete: shutting down");
|
||||||
this.exec.shutdownNow();
|
this.exec.shutdownNow();
|
||||||
if (this.unknownError != null) {
|
if (this.unknownError != null) {
|
||||||
|
LOG.debug("Scan failed", this.unknownError);
|
||||||
if (this.unknownError instanceof Error) {
|
if (this.unknownError instanceof Error) {
|
||||||
throw (Error) this.unknownError;
|
throw (Error) this.unknownError;
|
||||||
} else if (this.unknownError instanceof RuntimeException) {
|
} else if (this.unknownError instanceof RuntimeException) {
|
||||||
|
@ -148,7 +171,11 @@ public class LocatedFileStatusFetcher {
|
||||||
throw new IOException(this.unknownError);
|
throw new IOException(this.unknownError);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (this.invalidInputErrors.size() != 0) {
|
if (!this.invalidInputErrors.isEmpty()) {
|
||||||
|
LOG.debug("Invalid Input Errors raised");
|
||||||
|
for (IOException error : invalidInputErrors) {
|
||||||
|
LOG.debug("Error", error);
|
||||||
|
}
|
||||||
if (this.newApi) {
|
if (this.newApi) {
|
||||||
throw new org.apache.hadoop.mapreduce.lib.input.InvalidInputException(
|
throw new org.apache.hadoop.mapreduce.lib.input.InvalidInputException(
|
||||||
invalidInputErrors);
|
invalidInputErrors);
|
||||||
|
@ -161,7 +188,7 @@ public class LocatedFileStatusFetcher {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Collect misconfigured Input errors. Errors while actually reading file info
|
* Collect misconfigured Input errors. Errors while actually reading file info
|
||||||
* are reported immediately
|
* are reported immediately.
|
||||||
*/
|
*/
|
||||||
private void registerInvalidInputError(List<IOException> errors) {
|
private void registerInvalidInputError(List<IOException> errors) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
|
@ -171,9 +198,10 @@ public class LocatedFileStatusFetcher {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register fatal errors - example an IOException while accessing a file or a
|
* Register fatal errors - example an IOException while accessing a file or a
|
||||||
* full exection queue
|
* full execution queue.
|
||||||
*/
|
*/
|
||||||
private void registerError(Throwable t) {
|
private void registerError(Throwable t) {
|
||||||
|
LOG.debug("Error", t);
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
if (unknownError == null) {
|
if (unknownError == null) {
|
||||||
|
@ -221,7 +249,7 @@ public class LocatedFileStatusFetcher {
|
||||||
public Result call() throws Exception {
|
public Result call() throws Exception {
|
||||||
Result result = new Result();
|
Result result = new Result();
|
||||||
result.fs = fs;
|
result.fs = fs;
|
||||||
|
LOG.debug("ProcessInputDirCallable {}", fileStatus);
|
||||||
if (fileStatus.isDirectory()) {
|
if (fileStatus.isDirectory()) {
|
||||||
RemoteIterator<LocatedFileStatus> iter = fs
|
RemoteIterator<LocatedFileStatus> iter = fs
|
||||||
.listLocatedStatus(fileStatus.getPath());
|
.listLocatedStatus(fileStatus.getPath());
|
||||||
|
@ -242,8 +270,8 @@ public class LocatedFileStatusFetcher {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class Result {
|
private static class Result {
|
||||||
private List<FileStatus> locatedFileStatuses = new LinkedList<FileStatus>();
|
private List<FileStatus> locatedFileStatuses = new LinkedList<>();
|
||||||
private List<FileStatus> dirsNeedingRecursiveCalls = new LinkedList<FileStatus>();
|
private List<FileStatus> dirsNeedingRecursiveCalls = new LinkedList<>();
|
||||||
private FileSystem fs;
|
private FileSystem fs;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -259,11 +287,12 @@ public class LocatedFileStatusFetcher {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(ProcessInputDirCallable.Result result) {
|
public void onSuccess(ProcessInputDirCallable.Result result) {
|
||||||
try {
|
try {
|
||||||
if (result.locatedFileStatuses.size() != 0) {
|
if (!result.locatedFileStatuses.isEmpty()) {
|
||||||
resultQueue.add(result.locatedFileStatuses);
|
resultQueue.add(result.locatedFileStatuses);
|
||||||
}
|
}
|
||||||
if (result.dirsNeedingRecursiveCalls.size() != 0) {
|
if (!result.dirsNeedingRecursiveCalls.isEmpty()) {
|
||||||
for (FileStatus fileStatus : result.dirsNeedingRecursiveCalls) {
|
for (FileStatus fileStatus : result.dirsNeedingRecursiveCalls) {
|
||||||
|
LOG.debug("Queueing directory scan {}", fileStatus.getPath());
|
||||||
runningTasks.incrementAndGet();
|
runningTasks.incrementAndGet();
|
||||||
ListenableFuture<ProcessInputDirCallable.Result> future = exec
|
ListenableFuture<ProcessInputDirCallable.Result> future = exec
|
||||||
.submit(new ProcessInputDirCallable(result.fs, fileStatus,
|
.submit(new ProcessInputDirCallable(result.fs, fileStatus,
|
||||||
|
@ -309,6 +338,7 @@ public class LocatedFileStatusFetcher {
|
||||||
Result result = new Result();
|
Result result = new Result();
|
||||||
FileSystem fs = path.getFileSystem(conf);
|
FileSystem fs = path.getFileSystem(conf);
|
||||||
result.fs = fs;
|
result.fs = fs;
|
||||||
|
LOG.debug("ProcessInitialInputPathCallable path {}", path);
|
||||||
FileStatus[] matches = fs.globStatus(path, inputFilter);
|
FileStatus[] matches = fs.globStatus(path, inputFilter);
|
||||||
if (matches == null) {
|
if (matches == null) {
|
||||||
result.addError(new IOException("Input path does not exist: " + path));
|
result.addError(new IOException("Input path does not exist: " + path));
|
||||||
|
@ -337,7 +367,7 @@ public class LocatedFileStatusFetcher {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The callback handler to handle results generated by
|
* The callback handler to handle results generated by
|
||||||
* {@link ProcessInitialInputPathCallable}
|
* {@link ProcessInitialInputPathCallable}.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
private class ProcessInitialInputPathCallback implements
|
private class ProcessInitialInputPathCallback implements
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.mapreduce.lib.input;
|
package org.apache.hadoop.mapreduce.lib.input;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InterruptedIOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -283,7 +284,10 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
|
||||||
job.getConfiguration(), dirs, recursive, inputFilter, true);
|
job.getConfiguration(), dirs, recursive, inputFilter, true);
|
||||||
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
|
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new IOException("Interrupted while getting file statuses");
|
throw (IOException)
|
||||||
|
new InterruptedIOException(
|
||||||
|
"Interrupted while getting file statuses")
|
||||||
|
.initCause(e);
|
||||||
}
|
}
|
||||||
result = Lists.newArrayList(locatedFiles);
|
result = Lists.newArrayList(locatedFiles);
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,10 +37,14 @@ public class InvalidInputException extends IOException {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create the exception with the given list.
|
* Create the exception with the given list.
|
||||||
|
* The first element of the list is used as the init cause value.
|
||||||
* @param probs the list of problems to report. this list is not copied.
|
* @param probs the list of problems to report. this list is not copied.
|
||||||
*/
|
*/
|
||||||
public InvalidInputException(List<IOException> probs) {
|
public InvalidInputException(List<IOException> probs) {
|
||||||
problems = probs;
|
problems = probs;
|
||||||
|
if (!probs.isEmpty()) {
|
||||||
|
initCause(probs.get(0));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
|
import org.apache.hadoop.util.DurationInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class to provide lambda expression invocation of AWS operations.
|
* Class to provide lambda expression invocation of AWS operations.
|
||||||
|
@ -105,7 +106,7 @@ public class Invoker {
|
||||||
@Retries.OnceTranslated
|
@Retries.OnceTranslated
|
||||||
public static <T> T once(String action, String path, Operation<T> operation)
|
public static <T> T once(String action, String path, Operation<T> operation)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try (DurationInfo ignored = new DurationInfo(LOG, false, "%s", action)) {
|
||||||
return operation.execute();
|
return operation.execute();
|
||||||
} catch (AmazonClientException e) {
|
} catch (AmazonClientException e) {
|
||||||
throw S3AUtils.translateException(action, path, e);
|
throw S3AUtils.translateException(action, path, e);
|
||||||
|
|
|
@ -94,6 +94,7 @@ import org.apache.hadoop.fs.CommonPathCapabilities;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.Globber;
|
||||||
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
|
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
|
||||||
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
|
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
|
||||||
import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
|
import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
|
||||||
|
@ -2472,7 +2473,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||||
* @param newDir the current working directory.
|
* @param newDir the current working directory.
|
||||||
*/
|
*/
|
||||||
public void setWorkingDirectory(Path newDir) {
|
public void setWorkingDirectory(Path newDir) {
|
||||||
workingDir = newDir;
|
workingDir = makeQualified(newDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -3669,19 +3670,27 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public FileStatus[] globStatus(Path pathPattern) throws IOException {
|
public FileStatus[] globStatus(Path pathPattern) throws IOException {
|
||||||
entryPoint(INVOCATION_GLOB_STATUS);
|
return globStatus(pathPattern, ACCEPT_ALL);
|
||||||
return super.globStatus(pathPattern);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Override superclass so as to add statistic collection.
|
* Override superclass so as to disable symlink resolution and so avoid
|
||||||
|
* some calls to the FS which may have problems when the store is being
|
||||||
|
* inconsistent.
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
|
public FileStatus[] globStatus(
|
||||||
|
final Path pathPattern,
|
||||||
|
final PathFilter filter)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
entryPoint(INVOCATION_GLOB_STATUS);
|
entryPoint(INVOCATION_GLOB_STATUS);
|
||||||
return super.globStatus(pathPattern, filter);
|
return Globber.createGlobber(this)
|
||||||
|
.withPathPattern(pathPattern)
|
||||||
|
.withPathFiltern(filter)
|
||||||
|
.withResolveSymlinks(true)
|
||||||
|
.build()
|
||||||
|
.glob();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the LocatedFileStatusFetcher can do.
|
||||||
|
* This is related to HADOOP-16458.
|
||||||
|
* There's basic tests in ITestS3AFSMainOperations; this
|
||||||
|
* is see if we can create better corner cases.
|
||||||
|
*/
|
||||||
|
public class ITestLocatedFileStatusFetcher extends AbstractS3ATestBase {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(ITestLocatedFileStatusFetcher.class);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGlobScan() throws Throwable {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,65 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
|
import org.junit.Ignore;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSMainOperationsBaseTest;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.contract.s3a.S3AContract;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestPath;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* S3A Test suite for the FSMainOperationsBaseTest tests.
|
||||||
|
*/
|
||||||
|
public class ITestS3AFSMainOperations extends FSMainOperationsBaseTest {
|
||||||
|
|
||||||
|
|
||||||
|
public ITestS3AFSMainOperations() {
|
||||||
|
super(createTestPath(
|
||||||
|
new Path("/ITestS3AFSMainOperations")).toUri().toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected FileSystem createFileSystem() throws Exception {
|
||||||
|
S3AContract contract = new S3AContract(new Configuration());
|
||||||
|
contract.init();
|
||||||
|
return contract.getTestFileSystem();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Ignore("Permissions not supported")
|
||||||
|
public void testListStatusThrowsExceptionForUnreadableDir() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Ignore("Permissions not supported")
|
||||||
|
public void testGlobStatusThrowsExceptionForUnreadableDir() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Ignore("local FS path setup broken")
|
||||||
|
public void testCopyToLocalWithUseRawLocalFileSystemOption()
|
||||||
|
throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,707 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a.auth;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.nio.file.AccessDeniedException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
|
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
|
||||||
|
import org.apache.hadoop.fs.s3a.Constants;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3AUtils;
|
||||||
|
import org.apache.hadoop.fs.s3a.Statistic;
|
||||||
|
import org.apache.hadoop.fs.s3a.s3guard.LocalMetadataStore;
|
||||||
|
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
||||||
|
import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
|
||||||
|
import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.ASSUMED_ROLE_ARN;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
|
||||||
|
import static org.apache.hadoop.fs.s3a.auth.RoleModel.Effects;
|
||||||
|
import static org.apache.hadoop.fs.s3a.auth.RoleModel.Statement;
|
||||||
|
import static org.apache.hadoop.fs.s3a.auth.RoleModel.directory;
|
||||||
|
import static org.apache.hadoop.fs.s3a.auth.RoleModel.statement;
|
||||||
|
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.*;
|
||||||
|
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.bindRolePolicyStatements;
|
||||||
|
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.newAssumedRoleConfig;
|
||||||
|
import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS;
|
||||||
|
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||||
|
import static org.apache.hadoop.test.GenericTestUtils.failif;
|
||||||
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test creates a client with no read access to the underlying
|
||||||
|
* filesystem and then tries to perform various read operations on it.
|
||||||
|
* S3Guard in non-auth mode always goes to the FS, so we parameterize the
|
||||||
|
* test for S3Guard + Auth to see how failures move around.
|
||||||
|
* <ol>
|
||||||
|
* <li>Tests only run if an assumed role is provided.</li>
|
||||||
|
* <li>And the s3guard tests use the local metastore if
|
||||||
|
* there was not one already.</li>
|
||||||
|
* </ol>
|
||||||
|
* The tests are all bundled into one big test case.
|
||||||
|
* From a purist unit test perspective, this is utterly wrong as it goes
|
||||||
|
* against the
|
||||||
|
* <i>"Each test case tests exactly one thing"</i>
|
||||||
|
* philosophy of JUnit.
|
||||||
|
* <p>
|
||||||
|
* However is significantly reduces setup costs on the parameterized test runs,
|
||||||
|
* as it means that the filesystems and directories only need to be
|
||||||
|
* created and destroyed once per parameterized suite, rather than
|
||||||
|
* once per individual test.
|
||||||
|
* <p>
|
||||||
|
* All the test probes have informative messages so when a test failure
|
||||||
|
* does occur, its cause should be discoverable. It main weaknesses are
|
||||||
|
* therefore:
|
||||||
|
* <ol>
|
||||||
|
* <li>A failure of an assertion blocks all subsequent assertions from
|
||||||
|
* being checked.</li>
|
||||||
|
* <li>Maintenance is potentially harder.</li>
|
||||||
|
* </ol>
|
||||||
|
* To simplify maintenance, the operations tested are broken up into
|
||||||
|
* their own methods, with fields used to share the restricted role and
|
||||||
|
* created paths.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("ThrowableNotThrown")
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class ITestRestrictedReadAccess extends AbstractS3ATestBase {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(ITestRestrictedReadAccess.class);
|
||||||
|
|
||||||
|
/** Filter to select everything. */
|
||||||
|
private static final PathFilter EVERYTHING = t -> true;
|
||||||
|
|
||||||
|
/** Filter to select .txt files. */
|
||||||
|
private static final PathFilter TEXT_FILE =
|
||||||
|
path -> path.toUri().toString().endsWith(".txt");
|
||||||
|
|
||||||
|
/** The same path filter used in FileInputFormat. */
|
||||||
|
private static final PathFilter HIDDEN_FILE_FILTER =
|
||||||
|
(p) -> {
|
||||||
|
String n = p.getName();
|
||||||
|
return !n.startsWith("_") && !n.startsWith(".");
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Text found in LocatedFileStatusFetcher exception when the glob
|
||||||
|
* returned "null".
|
||||||
|
*/
|
||||||
|
private static final String DOES_NOT_EXIST = "does not exist";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Text found in LocatedFileStatusFetcher exception when
|
||||||
|
* the glob returned an empty list.
|
||||||
|
*/
|
||||||
|
private static final String MATCHES_0_FILES = "matches 0 files";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Text used in files.
|
||||||
|
*/
|
||||||
|
public static final byte[] HELLO = "hello".getBytes(Charset.forName("UTF-8"));
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wildcard scan to find *.txt in the no-read directory.
|
||||||
|
* When a scan/glob is done with S3Guard in auth mode, the scan will
|
||||||
|
* succeed but the file open will fail for any non-empty file.
|
||||||
|
* In non-auth mode, the read restrictions will fail the actual scan.
|
||||||
|
*/
|
||||||
|
private Path noReadWildcard;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parameterization.
|
||||||
|
*/
|
||||||
|
@Parameterized.Parameters(name = "{0}")
|
||||||
|
public static Collection<Object[]> params() {
|
||||||
|
return Arrays.asList(new Object[][]{
|
||||||
|
{"raw", false, false},
|
||||||
|
{"nonauth", true, false},
|
||||||
|
{"auth", true, true}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private final String name;
|
||||||
|
|
||||||
|
private final boolean s3guard;
|
||||||
|
|
||||||
|
private final boolean authMode;
|
||||||
|
|
||||||
|
private Path basePath;
|
||||||
|
|
||||||
|
private Path noReadDir;
|
||||||
|
|
||||||
|
private Path emptyDir;
|
||||||
|
|
||||||
|
private Path emptyFile;
|
||||||
|
|
||||||
|
private Path subDir;
|
||||||
|
|
||||||
|
private Path subdirFile;
|
||||||
|
|
||||||
|
private Path subDir2;
|
||||||
|
|
||||||
|
private Path subdir2File1;
|
||||||
|
|
||||||
|
private Path subdir2File2;
|
||||||
|
|
||||||
|
private Configuration roleConfig;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A read-only FS; if non-null it is closed in teardown.
|
||||||
|
*/
|
||||||
|
private S3AFileSystem readonlyFS;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test suite setup.
|
||||||
|
* @param name name for logs/paths.
|
||||||
|
* @param s3guard is S3Guard enabled?
|
||||||
|
* @param authMode is S3Guard in auth mode?
|
||||||
|
*/
|
||||||
|
public ITestRestrictedReadAccess(
|
||||||
|
final String name,
|
||||||
|
final boolean s3guard,
|
||||||
|
final boolean authMode) {
|
||||||
|
this.name = name;
|
||||||
|
this.s3guard = s3guard;
|
||||||
|
this.authMode = authMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration createConfiguration() {
|
||||||
|
Configuration conf = super.createConfiguration();
|
||||||
|
String bucketName = getTestBucketName(conf);
|
||||||
|
removeBucketOverrides(bucketName, conf,
|
||||||
|
S3_METADATA_STORE_IMPL,
|
||||||
|
METADATASTORE_AUTHORITATIVE);
|
||||||
|
conf.setClass(Constants.S3_METADATA_STORE_IMPL,
|
||||||
|
s3guard ?
|
||||||
|
LocalMetadataStore.class
|
||||||
|
: NullMetadataStore.class,
|
||||||
|
MetadataStore.class);
|
||||||
|
conf.setBoolean(METADATASTORE_AUTHORITATIVE, authMode);
|
||||||
|
disableFilesystemCaching(conf);
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setup() throws Exception {
|
||||||
|
super.setup();
|
||||||
|
assumeRoleTests();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void teardown() throws Exception {
|
||||||
|
S3AUtils.closeAll(LOG, readonlyFS);
|
||||||
|
super.teardown();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assumeRoleTests() {
|
||||||
|
assume("No ARN for role tests", !getAssumedRoleARN().isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getAssumedRoleARN() {
|
||||||
|
return getContract().getConf().getTrimmed(ASSUMED_ROLE_ARN, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create the assumed role configuration.
|
||||||
|
* @return a config bonded to the ARN of the assumed role
|
||||||
|
*/
|
||||||
|
public Configuration createAssumedRoleConfig() {
|
||||||
|
return createAssumedRoleConfig(getAssumedRoleARN());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a config for an assumed role; it also disables FS caching.
|
||||||
|
* @param roleARN ARN of role
|
||||||
|
* @return the new configuration
|
||||||
|
*/
|
||||||
|
private Configuration createAssumedRoleConfig(String roleARN) {
|
||||||
|
return newAssumedRoleConfig(getContract().getConf(), roleARN);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a single test case which invokes the individual test cases
|
||||||
|
* in sequence.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testNoReadAccess() throws Throwable {
|
||||||
|
describe("Test failure handling if the client doesn't"
|
||||||
|
+ " have read access under a path");
|
||||||
|
initNoReadAccess();
|
||||||
|
|
||||||
|
// now move up the API Chain, from the calls made by globStatus,
|
||||||
|
// to globStatus itself, and then to LocatedFileStatusFetcher,
|
||||||
|
// which invokes globStatus
|
||||||
|
|
||||||
|
checkBasicFileOperations();
|
||||||
|
checkGlobOperations();
|
||||||
|
checkSingleThreadedLocatedFileStatus();
|
||||||
|
checkLocatedFileStatusFourThreads();
|
||||||
|
checkLocatedFileStatusScanFile();
|
||||||
|
checkLocatedFileStatusNonexistentPath();
|
||||||
|
checkDeleteOperations();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the directory tree and the role filesystem.
|
||||||
|
*/
|
||||||
|
public void initNoReadAccess() throws Throwable {
|
||||||
|
describe("Setting up filesystem");
|
||||||
|
|
||||||
|
S3AFileSystem realFS = getFileSystem();
|
||||||
|
|
||||||
|
// avoiding the parameterization to steer clear of accidentally creating
|
||||||
|
// patterns
|
||||||
|
basePath = path("testNoReadAccess-" + name);
|
||||||
|
|
||||||
|
// define the paths and create them.
|
||||||
|
describe("Creating test directories and files");
|
||||||
|
|
||||||
|
// this is the directory to which the restricted role has no read
|
||||||
|
// access.
|
||||||
|
noReadDir = new Path(basePath, "noReadDir");
|
||||||
|
// wildcard scan to find *.txt
|
||||||
|
noReadWildcard = new Path(noReadDir, "*/*.txt");
|
||||||
|
|
||||||
|
// an empty directory directory under the noReadDir
|
||||||
|
emptyDir = new Path(noReadDir, "emptyDir");
|
||||||
|
realFS.mkdirs(emptyDir);
|
||||||
|
|
||||||
|
// an empty file directory under the noReadDir
|
||||||
|
emptyFile = new Path(noReadDir, "emptyFile.txt");
|
||||||
|
touch(realFS, emptyFile);
|
||||||
|
|
||||||
|
// a subdirectory
|
||||||
|
subDir = new Path(noReadDir, "subDir");
|
||||||
|
|
||||||
|
// and a file in that subdirectory
|
||||||
|
subdirFile = new Path(subDir, "subdirFile.txt");
|
||||||
|
createFile(realFS, subdirFile, true, HELLO);
|
||||||
|
subDir2 = new Path(noReadDir, "subDir2");
|
||||||
|
subdir2File1 = new Path(subDir2, "subdir2File1.txt");
|
||||||
|
subdir2File2 = new Path(subDir2, "subdir2File2.docx");
|
||||||
|
createFile(realFS, subdir2File1, true, HELLO);
|
||||||
|
createFile(realFS, subdir2File2, true, HELLO);
|
||||||
|
|
||||||
|
// create a role filesystem which does not have read access under a path
|
||||||
|
// it still has write access, which can be explored in the final
|
||||||
|
// step to delete files and directories.
|
||||||
|
roleConfig = createAssumedRoleConfig();
|
||||||
|
bindRolePolicyStatements(roleConfig,
|
||||||
|
STATEMENT_S3GUARD_CLIENT,
|
||||||
|
STATEMENT_ALLOW_SSE_KMS_RW,
|
||||||
|
statement(true, S3_ALL_BUCKETS, S3_ALL_OPERATIONS),
|
||||||
|
new Statement(Effects.Deny)
|
||||||
|
.addActions(S3_ALL_GET)
|
||||||
|
.addResources(directory(noReadDir)));
|
||||||
|
readonlyFS = (S3AFileSystem) basePath.getFileSystem(roleConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validate basic IO operations.
|
||||||
|
*/
|
||||||
|
public void checkBasicFileOperations() throws Throwable {
|
||||||
|
|
||||||
|
// this is a LIST call; there's no marker.
|
||||||
|
// so the sequence is
|
||||||
|
// - HEAD path -> FNFE
|
||||||
|
// - HEAD path + / -> FNFE
|
||||||
|
// - LIST path -> list results
|
||||||
|
// Because the client has list access, this succeeds
|
||||||
|
readonlyFS.listStatus(basePath);
|
||||||
|
|
||||||
|
// this is HEAD + "/" on S3; get on S3Guard auth
|
||||||
|
readonlyFS.listStatus(emptyDir);
|
||||||
|
|
||||||
|
// a recursive list of the no-read-directory works because
|
||||||
|
// there is no directory marker, it becomes a LIST call.
|
||||||
|
lsR(readonlyFS, noReadDir, true);
|
||||||
|
|
||||||
|
// similarly, a getFileStatus ends up being a list and generating
|
||||||
|
// a file status marker.
|
||||||
|
readonlyFS.getFileStatus(noReadDir);
|
||||||
|
|
||||||
|
// empty dir checks work!
|
||||||
|
readonlyFS.getFileStatus(emptyDir);
|
||||||
|
|
||||||
|
// now look at a file; the outcome depends on the mode.
|
||||||
|
if (authMode) {
|
||||||
|
// auth mode doesn't check S3, so no failure
|
||||||
|
readonlyFS.getFileStatus(subdirFile);
|
||||||
|
} else {
|
||||||
|
accessDenied(() ->
|
||||||
|
readonlyFS.getFileStatus(subdirFile));
|
||||||
|
}
|
||||||
|
|
||||||
|
// irrespective of mode, the attempt to read the data will fail.
|
||||||
|
// the only variable is where the failure occurs
|
||||||
|
accessDenied(() ->
|
||||||
|
ContractTestUtils.readUTF8(readonlyFS, subdirFile, HELLO.length));
|
||||||
|
|
||||||
|
// the empty file is interesting
|
||||||
|
if (!authMode) {
|
||||||
|
// non-auth mode, it fails at some point in the opening process.
|
||||||
|
// due to a HEAD being called on the object
|
||||||
|
accessDenied(() ->
|
||||||
|
ContractTestUtils.readUTF8(readonlyFS, emptyFile, 0));
|
||||||
|
} else {
|
||||||
|
// auth mode doesn't check the store.
|
||||||
|
// Furthermore, because it knows the file length is zero,
|
||||||
|
// it returns -1 without even opening the file.
|
||||||
|
// This means that permissions on the file do not get checked.
|
||||||
|
// See: HADOOP-16464.
|
||||||
|
try (FSDataInputStream is = readonlyFS.open(emptyFile)) {
|
||||||
|
Assertions.assertThat(is.read())
|
||||||
|
.describedAs("read of empty file")
|
||||||
|
.isEqualTo(-1);
|
||||||
|
}
|
||||||
|
readonlyFS.getFileStatus(subdirFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Explore Glob's recursive scan.
|
||||||
|
*/
|
||||||
|
public void checkGlobOperations() throws Throwable {
|
||||||
|
|
||||||
|
describe("Glob Status operations");
|
||||||
|
// baseline: the real filesystem on a subdir
|
||||||
|
globFS(getFileSystem(), subdirFile, null, false, 1);
|
||||||
|
// a file fails if not in auth mode
|
||||||
|
globFS(readonlyFS, subdirFile, null, !authMode, 1);
|
||||||
|
// empty directories don't fail.
|
||||||
|
assertStatusPathEquals(emptyDir,
|
||||||
|
globFS(readonlyFS, emptyDir, null, false, 1));
|
||||||
|
|
||||||
|
FileStatus[] st = globFS(readonlyFS,
|
||||||
|
noReadWildcard,
|
||||||
|
null, false, 2);
|
||||||
|
Assertions.assertThat(st)
|
||||||
|
.extracting(FileStatus::getPath)
|
||||||
|
.containsExactlyInAnyOrder(subdirFile, subdir2File1);
|
||||||
|
|
||||||
|
// there is precisely one .docx file (subdir2File2.docx)
|
||||||
|
globFS(readonlyFS,
|
||||||
|
new Path(noReadDir, "*/*.docx"),
|
||||||
|
null, false, 1);
|
||||||
|
|
||||||
|
// there are no .doc files.
|
||||||
|
globFS(readonlyFS,
|
||||||
|
new Path(noReadDir, "*/*.doc"),
|
||||||
|
null, false, 0);
|
||||||
|
globFS(readonlyFS, noReadDir,
|
||||||
|
EVERYTHING, false, 1);
|
||||||
|
// and a filter without any wildcarded pattern only finds
|
||||||
|
// the role dir itself.
|
||||||
|
FileStatus[] st2 = globFS(readonlyFS, noReadDir,
|
||||||
|
EVERYTHING, false, 1);
|
||||||
|
Assertions.assertThat(st2)
|
||||||
|
.extracting(FileStatus::getPath)
|
||||||
|
.containsExactly(noReadDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run a located file status fetcher against the directory tree.
|
||||||
|
*/
|
||||||
|
public void checkSingleThreadedLocatedFileStatus() throws Throwable {
|
||||||
|
|
||||||
|
describe("LocatedFileStatusFetcher operations");
|
||||||
|
// use the same filter as FileInputFormat; single thread.
|
||||||
|
roleConfig.setInt(LIST_STATUS_NUM_THREADS, 1);
|
||||||
|
LocatedFileStatusFetcher fetcher =
|
||||||
|
new LocatedFileStatusFetcher(
|
||||||
|
roleConfig,
|
||||||
|
new Path[]{basePath},
|
||||||
|
true,
|
||||||
|
HIDDEN_FILE_FILTER,
|
||||||
|
true);
|
||||||
|
Assertions.assertThat(fetcher.getFileStatuses())
|
||||||
|
.describedAs("result of located scan")
|
||||||
|
.flatExtracting(FileStatus::getPath)
|
||||||
|
.containsExactlyInAnyOrder(
|
||||||
|
emptyFile,
|
||||||
|
subdirFile,
|
||||||
|
subdir2File1,
|
||||||
|
subdir2File2);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run a located file status fetcher against the directory tree.
|
||||||
|
*/
|
||||||
|
public void checkLocatedFileStatusFourThreads() throws Throwable {
|
||||||
|
|
||||||
|
// four threads and the text filter.
|
||||||
|
int threads = 4;
|
||||||
|
describe("LocatedFileStatusFetcher with %d", threads);
|
||||||
|
roleConfig.setInt(LIST_STATUS_NUM_THREADS, threads);
|
||||||
|
LocatedFileStatusFetcher fetcher2 =
|
||||||
|
new LocatedFileStatusFetcher(
|
||||||
|
roleConfig,
|
||||||
|
new Path[]{noReadWildcard},
|
||||||
|
true,
|
||||||
|
EVERYTHING,
|
||||||
|
true);
|
||||||
|
Assertions.assertThat(fetcher2.getFileStatuses())
|
||||||
|
.describedAs("result of located scan")
|
||||||
|
.isNotNull()
|
||||||
|
.flatExtracting(FileStatus::getPath)
|
||||||
|
.containsExactlyInAnyOrder(subdirFile, subdir2File1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run a located file status fetcher against the directory tree.
|
||||||
|
*/
|
||||||
|
public void checkLocatedFileStatusScanFile() throws Throwable {
|
||||||
|
// pass in a file as the base of the scan.
|
||||||
|
describe("LocatedFileStatusFetcher with file %s", subdirFile);
|
||||||
|
roleConfig.setInt(LIST_STATUS_NUM_THREADS, 16);
|
||||||
|
try {
|
||||||
|
Iterable<FileStatus> fetched = new LocatedFileStatusFetcher(
|
||||||
|
roleConfig,
|
||||||
|
new Path[]{subdirFile},
|
||||||
|
true,
|
||||||
|
TEXT_FILE,
|
||||||
|
true).getFileStatuses();
|
||||||
|
// when not in auth mode, the HEAD request MUST have failed.
|
||||||
|
failif(!authMode, "LocatedFileStatusFetcher(" + subdirFile + ")"
|
||||||
|
+ " should have failed");
|
||||||
|
// and in auth mode, the file MUST have been found.
|
||||||
|
Assertions.assertThat(fetched)
|
||||||
|
.describedAs("result of located scan")
|
||||||
|
.isNotNull()
|
||||||
|
.flatExtracting(FileStatus::getPath)
|
||||||
|
.containsExactly(subdirFile);
|
||||||
|
} catch (AccessDeniedException e) {
|
||||||
|
// we require the HEAD request to fail with access denied in non-auth
|
||||||
|
// mode, but not in auth mode.
|
||||||
|
failif(authMode, "LocatedFileStatusFetcher(" + subdirFile + ")", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Explore what happens with a path that does not exist.
|
||||||
|
*/
|
||||||
|
public void checkLocatedFileStatusNonexistentPath() throws Throwable {
|
||||||
|
// scan a path that doesn't exist
|
||||||
|
Path nonexistent = new Path(noReadDir, "nonexistent");
|
||||||
|
InvalidInputException ex = intercept(InvalidInputException.class,
|
||||||
|
DOES_NOT_EXIST,
|
||||||
|
() -> new LocatedFileStatusFetcher(
|
||||||
|
roleConfig,
|
||||||
|
new Path[]{nonexistent},
|
||||||
|
true,
|
||||||
|
EVERYTHING,
|
||||||
|
true)
|
||||||
|
.getFileStatuses());
|
||||||
|
// validate nested exception
|
||||||
|
assertExceptionContains(DOES_NOT_EXIST, ex.getCause());
|
||||||
|
|
||||||
|
// a file which exists but which doesn't match the pattern
|
||||||
|
// is downgraded to not existing.
|
||||||
|
intercept(InvalidInputException.class,
|
||||||
|
DOES_NOT_EXIST,
|
||||||
|
() -> new LocatedFileStatusFetcher(
|
||||||
|
roleConfig,
|
||||||
|
new Path[]{noReadDir},
|
||||||
|
true,
|
||||||
|
TEXT_FILE,
|
||||||
|
true)
|
||||||
|
.getFileStatuses());
|
||||||
|
|
||||||
|
// a pattern under a nonexistent path is considered to not be a match.
|
||||||
|
ex = intercept(
|
||||||
|
InvalidInputException.class,
|
||||||
|
MATCHES_0_FILES,
|
||||||
|
() -> new LocatedFileStatusFetcher(
|
||||||
|
roleConfig,
|
||||||
|
new Path[]{new Path(nonexistent, "*.txt)")},
|
||||||
|
true,
|
||||||
|
TEXT_FILE,
|
||||||
|
true)
|
||||||
|
.getFileStatuses());
|
||||||
|
// validate nested exception
|
||||||
|
assertExceptionContains(MATCHES_0_FILES, ex.getCause());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Do some cleanup to see what happens with delete calls.
|
||||||
|
* Cleanup happens in test teardown anyway; doing it here
|
||||||
|
* just makes use of the delete calls to see how delete failures
|
||||||
|
* change with permissions and S3Guard stettings.
|
||||||
|
*/
|
||||||
|
public void checkDeleteOperations() throws Throwable {
|
||||||
|
describe("Testing delete operations");
|
||||||
|
|
||||||
|
if (!authMode) {
|
||||||
|
// unguarded or non-auth S3Guard to fail on HEAD + /
|
||||||
|
accessDenied(() -> readonlyFS.delete(emptyDir, true));
|
||||||
|
// to fail on HEAD
|
||||||
|
accessDenied(() -> readonlyFS.delete(emptyFile, true));
|
||||||
|
} else {
|
||||||
|
// auth mode checks DDB for status and then issues the DELETE
|
||||||
|
readonlyFS.delete(emptyDir, true);
|
||||||
|
readonlyFS.delete(emptyFile, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// this will succeed for both as there is no subdir marker.
|
||||||
|
readonlyFS.delete(subDir, true);
|
||||||
|
// after which it is not there
|
||||||
|
fileNotFound(() -> readonlyFS.getFileStatus(subDir));
|
||||||
|
// and nor is its child.
|
||||||
|
fileNotFound(() -> readonlyFS.getFileStatus(subdirFile));
|
||||||
|
|
||||||
|
// now delete the base path
|
||||||
|
readonlyFS.delete(basePath, true);
|
||||||
|
// and expect an FNFE
|
||||||
|
fileNotFound(() -> readonlyFS.getFileStatus(subDir));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Require an operation to fail with a FileNotFoundException.
|
||||||
|
* @param eval closure to evaluate.
|
||||||
|
* @param <T> type of callable
|
||||||
|
* @return the exception.
|
||||||
|
* @throws Exception any other exception
|
||||||
|
*/
|
||||||
|
protected <T> FileNotFoundException fileNotFound(final Callable<T> eval)
|
||||||
|
throws Exception {
|
||||||
|
return intercept(FileNotFoundException.class, eval);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Require an operation to fail with an AccessDeniedException.
|
||||||
|
* @param eval closure to evaluate.
|
||||||
|
* @param <T> type of callable
|
||||||
|
* @return the exception.
|
||||||
|
* @throws Exception any other exception
|
||||||
|
*/
|
||||||
|
protected <T> AccessDeniedException accessDenied(final Callable<T> eval)
|
||||||
|
throws Exception {
|
||||||
|
return intercept(AccessDeniedException.class, eval);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert that a status array has exactly one element and its
|
||||||
|
* value is as expected.
|
||||||
|
* @param expected expected path
|
||||||
|
* @param statuses list of statuses
|
||||||
|
*/
|
||||||
|
protected void assertStatusPathEquals(final Path expected,
|
||||||
|
final FileStatus[] statuses) {
|
||||||
|
Assertions.assertThat(statuses)
|
||||||
|
.describedAs("List of status entries")
|
||||||
|
.isNotNull()
|
||||||
|
.hasSize(1);
|
||||||
|
Assertions.assertThat(statuses[0].getPath())
|
||||||
|
.describedAs("Status entry %s", statuses[0])
|
||||||
|
.isEqualTo(expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Glob under a path with expected outcomes.
|
||||||
|
* @param fs filesystem to use
|
||||||
|
* @param path path (which can include patterns)
|
||||||
|
* @param filter optional filter
|
||||||
|
* @param expectAuthFailure is auth failure expected?
|
||||||
|
* @param expectedCount expected count of results; -1 means null response
|
||||||
|
* @return the result of a successful glob or null if an expected auth
|
||||||
|
* failure was caught.
|
||||||
|
* @throws IOException failure.
|
||||||
|
*/
|
||||||
|
protected FileStatus[] globFS(
|
||||||
|
final S3AFileSystem fs,
|
||||||
|
final Path path,
|
||||||
|
final PathFilter filter,
|
||||||
|
boolean expectAuthFailure,
|
||||||
|
final int expectedCount)
|
||||||
|
throws IOException {
|
||||||
|
LOG.info("Glob {}", path);
|
||||||
|
S3ATestUtils.MetricDiff getMetric = new S3ATestUtils.MetricDiff(fs,
|
||||||
|
Statistic.OBJECT_METADATA_REQUESTS);
|
||||||
|
S3ATestUtils.MetricDiff listMetric = new S3ATestUtils.MetricDiff(fs,
|
||||||
|
Statistic.OBJECT_LIST_REQUESTS);
|
||||||
|
FileStatus[] st;
|
||||||
|
try {
|
||||||
|
st = filter == null
|
||||||
|
? fs.globStatus(path)
|
||||||
|
: fs.globStatus(path, filter);
|
||||||
|
LOG.info("Metrics:\n {},\n {}", getMetric, listMetric);
|
||||||
|
if (expectAuthFailure) {
|
||||||
|
// should have failed here
|
||||||
|
String resultStr;
|
||||||
|
if (st == null) {
|
||||||
|
resultStr = "A null array";
|
||||||
|
} else {
|
||||||
|
resultStr = StringUtils.join(st, ",");
|
||||||
|
}
|
||||||
|
fail(String.format("globStatus(%s) should have raised"
|
||||||
|
+ " an exception, but returned %s", path, resultStr));
|
||||||
|
}
|
||||||
|
} catch (AccessDeniedException e) {
|
||||||
|
LOG.info("Metrics:\n {},\n {}", getMetric, listMetric);
|
||||||
|
failif(!expectAuthFailure, "Access denied in glob of " + path,
|
||||||
|
e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (expectedCount < 0) {
|
||||||
|
Assertions.assertThat(st)
|
||||||
|
.describedAs("Glob of %s", path)
|
||||||
|
.isNull();
|
||||||
|
} else {
|
||||||
|
Assertions.assertThat(st)
|
||||||
|
.describedAs("Glob of %s", path)
|
||||||
|
.isNotNull()
|
||||||
|
.hasSize(expectedCount);
|
||||||
|
}
|
||||||
|
return st;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue