HADOOP-15691. Add PathCapabilities to FileSystem and FileContext.

Contributed by Steve Loughran.

This complements the StreamCapabilities Interface by allowing applications to probe for a specific path on a specific instance of a FileSystem client
to offer a specific capability.

This is intended to allow applications to determine

* Whether a method is implemented before calling it and dealing with UnsupportedOperationException.
* Whether a specific feature is believed to be available in the remote store.

As well as a common set of capabilities defined in CommonPathCapabilities,
file systems are free to add their own capabilities, prefixed with
 fs. + schema + .

The plan is to identify and document more capabilities -and for file systems which add new features, for a declaration of the availability of the feature to always be available.

Note

* The remote store is not expected to be checked for the feature;
  It is more a check of client API and the client's configuration/knowledge
  of the state of the remote system.
* Permissions are not checked.
This commit is contained in:
Steve Loughran 2020-08-19 17:15:06 +01:00 committed by GitHub
parent 2a40a33dfe
commit 42c71a5790
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 1093 additions and 34 deletions

View File

@ -54,6 +54,8 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
/**
* This class provides an interface for implementors of a Hadoop file system
* (analogous to the VFS of Unix). Applications do not access this class;
@ -66,7 +68,7 @@ import org.slf4j.LoggerFactory;
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class AbstractFileSystem {
public abstract class AbstractFileSystem implements PathCapabilities {
static final Logger LOG = LoggerFactory.getLogger(AbstractFileSystem.class);
/** Recording statistics per a file system class. */
@ -1309,4 +1311,17 @@ public abstract class AbstractFileSystem {
}
return myUri.equals(((AbstractFileSystem) other).myUri);
}
public boolean hasPathCapability(final Path path,
final String capability)
throws IOException {
switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
case CommonPathCapabilities.FS_SYMLINKS:
// delegate to the existing supportsSymlinks() call.
return supportsSymlinks();
default:
// the feature is not implemented.
return false;
}
}
}

View File

@ -35,6 +35,8 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
/****************************************************************
* Abstract Checksumed FileSystem.
* It provide a basic implementation of a Checksumed FileSystem,
@ -780,4 +782,23 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
long inPos, FSDataInputStream sums, long sumsPos) {
return false;
}
/**
* Disable those operations which the checksummed FS blocks.
* {@inheritDoc}
*/
@Override
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
// query the superclass, which triggers argument validation.
final Path p = makeQualified(path);
switch (validatePathCapabilityArgs(p, capability)) {
case CommonPathCapabilities.FS_APPEND:
case CommonPathCapabilities.FS_CONCAT:
return false;
default:
return super.hasPathCapability(p, capability);
}
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
/**
* Common path capabilities.
*/
public final class CommonPathCapabilities {
private CommonPathCapabilities() {
}
/**
* Does the store support
* {@code FileSystem.setAcl(Path, List)},
* {@code FileSystem.getAclStatus(Path)}
* and related methods?
* Value: {@value}.
*/
public static final String FS_ACLS = "fs.capability.paths.acls";
/**
* Does the store support {@code FileSystem.append(Path)}?
* Value: {@value}.
*/
public static final String FS_APPEND = "fs.capability.paths.append";
/**
* Does the store support {@code FileSystem.getFileChecksum(Path)}?
* Value: {@value}.
*/
public static final String FS_CHECKSUMS = "fs.capability.paths.checksums";
/**
* Does the store support {@code FileSystem.concat(Path, Path[])}?
* Value: {@value}.
*/
public static final String FS_CONCAT = "fs.capability.paths.concat";
/**
* Does the store support {@code FileSystem.listCorruptFileBlocks(Path)} ()}?
* Value: {@value}.
*/
public static final String FS_LIST_CORRUPT_FILE_BLOCKS =
"fs.capability.paths.list-corrupt-file-blocks";
/**
* Does the store support
* {@code FileSystem.createPathHandle(FileStatus, Options.HandleOpt...)}
* and related methods?
* Value: {@value}.
*/
public static final String FS_PATHHANDLES = "fs.capability.paths.pathhandles";
/**
* Does the store support {@code FileSystem.setPermission(Path, FsPermission)}
* and related methods?
* Value: {@value}.
*/
public static final String FS_PERMISSIONS = "fs.capability.paths.permissions";
/**
* Does this filesystem connector only support filesystem read operations?
* For example, the {@code HttpFileSystem} is always read-only.
* This is different from "is the specific instance and path read only?",
* which must be determined by checking permissions (where supported), or
* attempting write operations under a path.
* Value: {@value}.
*/
public static final String FS_READ_ONLY_CONNECTOR =
"fs.capability.paths.read-only-connector";
/**
* Does the store support snapshots through
* {@code FileSystem.createSnapshot(Path)} and related methods??
* Value: {@value}.
*/
public static final String FS_SNAPSHOTS = "fs.capability.paths.snapshots";
/**
* Does the store support {@code FileSystem.setStoragePolicy(Path, String)}
* and related methods?
* Value: {@value}.
*/
public static final String FS_STORAGEPOLICY =
"fs.capability.paths.storagepolicy";
/**
* Does the store support symlinks through
* {@code FileSystem.createSymlink(Path, Path, boolean)} and related methods?
* Value: {@value}.
*/
public static final String FS_SYMLINKS =
"fs.capability.paths.symlinks";
/**
* Does the store support {@code FileSystem#truncate(Path, long)} ?
* Value: {@value}.
*/
public static final String FS_TRUNCATE =
"fs.capability.paths.truncate";
/**
* Does the store support XAttributes through
* {@code FileSystem#.setXAttr()} and related methods?
* Value: {@value}.
*/
public static final String FS_XATTRS = "fs.capability.paths.xattrs";
}

View File

@ -261,4 +261,11 @@ public abstract class DelegateToFileSystem extends AbstractFileSystem {
public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
return Arrays.asList(fsImpl.addDelegationTokens(renewer, null));
}
@Override
public boolean hasPathCapability(final Path path,
final String capability)
throws IOException {
return fsImpl.hasPathCapability(path, capability);
}
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.impl.FsLinkResolution;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
@ -56,7 +57,6 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RpcClientException;
import org.apache.hadoop.ipc.RpcServerException;
import org.apache.hadoop.ipc.UnexpectedServerException;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@ -68,6 +68,8 @@ import org.apache.htrace.core.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
/**
* The FileContext class provides an interface for users of the Hadoop
* file system. It exposes a number of file system operations, e.g. create,
@ -171,7 +173,7 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FileContext {
public class FileContext implements PathCapabilities {
public static final Logger LOG = LoggerFactory.getLogger(FileContext.class);
/**
@ -2846,4 +2848,21 @@ public class FileContext {
Tracer getTracer() {
return tracer;
}
/**
* Return the path capabilities of the bonded {@code AbstractFileSystem}.
* @param path path to query the capability of.
* @param capability string to query the stream support for.
* @return true iff the capability is supported under that FS.
* @throws IOException path resolution or other IO failure
* @throws IllegalArgumentException invalid arguments
*/
public boolean hasPathCapability(Path path, String capability)
throws IOException {
validatePathCapabilityArgs(path, capability);
return FsLinkResolution.resolve(this,
fixRelativePart(path),
(fs, p) -> fs.hasPathCapability(p, capability));
}
}

View File

@ -81,6 +81,7 @@ import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
/****************************************************************
* An abstract base class for a fairly generic filesystem. It
@ -122,7 +123,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileSystem extends Configured
implements Closeable, DelegationTokenIssuer {
implements Closeable, DelegationTokenIssuer, PathCapabilities {
public static final String FS_DEFAULT_NAME_KEY =
CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
public static final String DEFAULT_FS =
@ -702,6 +703,7 @@ public abstract class FileSystem extends Configured
*
*/
protected void checkPath(Path path) {
Preconditions.checkArgument(path != null, "null path");
URI uri = path.toUri();
String thatScheme = uri.getScheme();
if (thatScheme == null) // fs is relative
@ -3192,6 +3194,25 @@ public abstract class FileSystem extends Configured
return ret;
}
/**
* The base FileSystem implementation generally has no knowledge
* of the capabilities of actual implementations.
* Unless it has a way to explicitly determine the capabilities,
* this method returns false.
* {@inheritDoc}
*/
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
case CommonPathCapabilities.FS_SYMLINKS:
// delegate to the existing supportsSymlinks() call.
return supportsSymlinks() && areSymlinksEnabled();
default:
// the feature is not implemented.
return false;
}
}
// making it volatile to be able to do a double checked locking
private volatile static boolean FILE_SYSTEMS_LOADED = false;

View File

@ -691,4 +691,11 @@ public class FilterFileSystem extends FileSystem {
public FSDataOutputStreamBuilder appendFile(Path path) {
return fs.appendFile(path);
}
@Override
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
return fs.hasPathCapability(path, capability);
}
}

View File

@ -428,4 +428,10 @@ public abstract class FilterFs extends AbstractFileSystem {
throws IOException {
return myFs.getAllStoragePolicies();
}
public boolean hasPathCapability(final Path path,
final String capability)
throws IOException {
return myFs.hasPathCapability(path, capability);
}
}

View File

@ -36,6 +36,8 @@ import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.util.*;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
/**
* This is an implementation of the Hadoop Archive
* Filesystem. This archive Filesystem has index files
@ -899,7 +901,22 @@ public class HarFileSystem extends FileSystem {
throws IOException {
throw new IOException("Har: setPermission not allowed");
}
/**
* Declare that this filesystem connector is always read only.
* {@inheritDoc}
*/
@Override
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
switch (validatePathCapabilityArgs(path, capability)) {
case CommonPathCapabilities.FS_READ_ONLY_CONNECTOR:
return true;
default:
return false;
}
}
/**
* Hadoop archives input stream. This input stream fakes EOF
* since archive files are part of bigger part files.

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
/**
* The Path counterpoint to {@link StreamCapabilities}; a query to see if,
* a FileSystem/FileContext instance has a specific capability under the given
* path.
* Other classes may also implement the interface, as desired.
*
* See {@link CommonPathCapabilities} for the well-known capabilities.
*/
public interface PathCapabilities {
/**
* Probe for a specific capability under the given path.
* If the function returns {@code true}, this instance is explicitly
* declaring that the capability is available.
* If the function returns {@code false}, it can mean one of:
* <ul>
* <li>The capability is not known.</li>
* <li>The capability is known but it is not supported.</li>
* <li>The capability is known but the filesystem does not know if it
* is supported under the supplied path.</li>
* </ul>
* The core guarantee which a caller can rely on is: if the predicate
* returns true, then the specific operation/behavior can be expected to be
* supported. However a specific call may be rejected for permission reasons,
* the actual file/directory not being present, or some other failure during
* the attempted execution of the operation.
* <p>
* Implementors: {@link org.apache.hadoop.fs.impl.PathCapabilitiesSupport}
* can be used to help implement this method.
* @param path path to query the capability of.
* @param capability non-null, non-empty string to query the path for support.
* @return true if the capability is supported under that part of the FS.
* @throws IOException this should not be raised, except on problems
* resolving paths or relaying the call.
* @throws IllegalArgumentException invalid arguments
*/
boolean hasPathCapability(Path path, String capability)
throws IOException;
}

View File

@ -53,6 +53,8 @@ import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
/****************************************************************
* Implement the FileSystem API for the raw local filesystem.
*
@ -1060,4 +1062,21 @@ public class RawLocalFileSystem extends FileSystem {
// return an unqualified symlink target
return fi.getSymlink();
}
@Override
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
case CommonPathCapabilities.FS_APPEND:
case CommonPathCapabilities.FS_CONCAT:
case CommonPathCapabilities.FS_PATHHANDLES:
case CommonPathCapabilities.FS_PERMISSIONS:
case CommonPathCapabilities.FS_TRUNCATE:
return true;
case CommonPathCapabilities.FS_SYMLINKS:
return FileSystem.areSymlinksEnabled();
default:
return super.hasPathCapability(path, capability);
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.fs.http;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@ -36,6 +37,8 @@ import java.io.InputStream;
import java.net.URI;
import java.net.URLConnection;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
abstract class AbstractHttpFileSystem extends FileSystem {
private static final long DEFAULT_BLOCK_SIZE = 4096;
private static final Path WORKING_DIR = new Path("/");
@ -111,6 +114,21 @@ abstract class AbstractHttpFileSystem extends FileSystem {
return new FileStatus(-1, false, 1, DEFAULT_BLOCK_SIZE, 0, path);
}
/**
* Declare that this filesystem connector is always read only.
* {@inheritDoc}
*/
@Override
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
switch (validatePathCapabilityArgs(path, capability)) {
case CommonPathCapabilities.FS_READ_ONLY_CONNECTOR:
return true;
default:
return super.hasPathCapability(path, capability);
}
}
private static class HttpDataInputStream extends FilterInputStream
implements Seekable, PositionedReadable {

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.impl;
import java.io.IOException;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FSLinkResolver;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
/**
* Class to allow Lambda expressions to be used in {@link FileContext}
* link resolution.
* @param <T> type of the returned value.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class FsLinkResolution<T> extends FSLinkResolver<T> {
/**
* The function to invoke in the {@link #next(AbstractFileSystem, Path)} call.
*/
private final FsLinkResolutionFunction<T> fn;
/**
* Construct an instance with the given function.
* @param fn function to invoke.
*/
public FsLinkResolution(final FsLinkResolutionFunction<T> fn) {
this.fn = Preconditions.checkNotNull(fn);
}
@Override
public T next(final AbstractFileSystem fs, final Path p)
throws UnresolvedLinkException, IOException {
return fn.apply(fs, p);
}
/**
* The signature of the function to invoke.
* @param <T> type resolved to
*/
@FunctionalInterface
public interface FsLinkResolutionFunction<T> {
/**
*
* @param fs filesystem to resolve against.
* @param path path to resolve
* @return a result of type T
* @throws UnresolvedLinkException link resolution failure
* @throws IOException other IO failure.
*/
T apply(final AbstractFileSystem fs, final Path path)
throws IOException, UnresolvedLinkException;
}
/**
* Apply the given function to the resolved path under the the supplied
* FileContext.
* @param fileContext file context to resolve under
* @param path path to resolve
* @param fn function to invoke
* @param <T> return type.
* @return the return value of the function as revoked against the resolved
* path.
* @throws UnresolvedLinkException link resolution failure
* @throws IOException other IO failure.
*/
public static <T> T resolve(
final FileContext fileContext, final Path path,
final FsLinkResolutionFunction<T> fn)
throws UnresolvedLinkException, IOException {
return new FsLinkResolution<>(fn).resolve(fileContext, path);
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.impl;
import java.util.Locale;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathCapabilities;
import static com.google.common.base.Preconditions.checkArgument;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class PathCapabilitiesSupport {
/**
* Validate the arguments to
* {@link PathCapabilities#hasPathCapability(Path, String)}.
* @param path path to query the capability of.
* @param capability non-null, non-empty string to query the path for support.
* @return the string to use in a switch statement.
* @throws IllegalArgumentException if a an argument is invalid.
*/
public static String validatePathCapabilityArgs(
final Path path, final String capability) {
checkArgument(path != null, "null path");
checkArgument(capability != null, "capability parameter is null");
checkArgument(!capability.isEmpty(),
"capability parameter is empty string");
return capability.toLowerCase(Locale.ENGLISH);
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* This package contains implementation classes for use inside
* filesystems.
*
* These classes MUST NOT be directly exposed as the arguments
* or return values of methods, or as part of a visible
* inheritance tree.
*
* These classes MAY be returned behind interfaces.
* When such interfaces are used as parameters, the methods
* which accept the interfaces MUST NOT cast them to the classes
* contained therein: they MUST interact purely through
* the interface.
*
* That is: don't expose the implementation classes in here,
* and don't expect input interface implementations to always
* be the classes in here.
*
* These classes are for the private use of FileSystem/
* FileContext implementations.
* Implementation classes not developed within the ASF Hadoop
* codebase MAY use these, with the caveat that these classes
* are highly unstable.
*/
@InterfaceAudience.LimitedPrivate("Filesystems")
@InterfaceStability.Unstable
package org.apache.hadoop.fs.impl;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -474,4 +474,9 @@ class ChRootedFileSystem extends FilterFileSystem {
super.unsetStoragePolicy(fullPath(src));
}
@Override
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
return super.hasPathCapability(fullPath(path), capability);
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.fs.viewfs;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE;
import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE_DEFAULT;
import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME;
@ -48,6 +49,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
@ -1090,6 +1092,36 @@ public class ViewFileSystem extends FileSystem {
return res.targetFileSystem.getLinkTarget(res.remainingPath);
}
/**
* Reject the concat operation; forward the rest to the viewed FS.
* @param path path to query the capability of.
* @param capability string to query the stream support for.
* @return the capability
* @throws IOException if there is no resolved FS, or it raises an IOE.
*/
@Override
public boolean hasPathCapability(Path path, String capability)
throws IOException {
final Path p = makeQualified(path);
switch (validatePathCapabilityArgs(p, capability)) {
case CommonPathCapabilities.FS_CONCAT:
// concat is not supported, as it may be invoked across filesystems.
return false;
default:
// no break
}
// otherwise, check capabilities of mounted FS.
try {
InodeTree.ResolveResult<FileSystem> res
= fsState.resolve(getUriPath(p), true);
return res.targetFileSystem.hasPathCapability(res.remainingPath,
capability);
} catch (FileNotFoundException e) {
// no mount point, nothing will work.
throw new NotInMountpointException(p, "hasPathCapability");
}
}
/**
* An instance of this class represents an internal dir of the viewFs
* that is internal dir of the mount table.

View File

@ -526,7 +526,7 @@ on the filesystem.
`getFileStatus(P).getBlockSize()`.
1. By inference, it MUST be > 0 for any file of length > 0.
## State Changing Operations
## <a name="state_changing_operations"></a> State Changing Operations
### `boolean mkdirs(Path p, FsPermission permission)`
@ -1372,7 +1372,7 @@ public interface StreamCapabilities {
### `boolean hasCapability(capability)`
Return true if the `OutputStream`, `InputStream`, or other FileSystem class
Return true iff the `OutputStream`, `InputStream`, or other FileSystem class
has the desired capability.
The caller can query the capabilities of a stream using a string value.
@ -1385,3 +1385,4 @@ hsync | HSYNC | Syncable | Flush out the data in client's us
in:readahead | READAHEAD | CanSetReadahead | Set the readahead on the input stream.
dropbehind | DROPBEHIND | CanSetDropBehind | Drop the cache.
in:unbuffer | UNBUFFER | CanUnbuffer | Reduce the buffering on the input stream.

View File

@ -33,6 +33,7 @@ HDFS as these are commonly expected by Hadoop client applications.
1. [Model](model.html)
1. [FileSystem class](filesystem.html)
1. [FSDataInputStream class](fsdatainputstream.html)
1. [PathCapabilities interface](pathcapabilities.html)
1. [FSDataOutputStreamBuilder class](fsdataoutputstreambuilder.html)
2. [Testing with the Filesystem specification](testing.html)
2. [Extending the specification and its tests](extending.html)

View File

@ -0,0 +1,158 @@
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
# <a name="PathCapabilities"></a> interface `PathCapabilities`
The `PathCapabilities` interface provides a way to programmatically query the
operations offered under a given path by an instance of `FileSystem`, `FileContext`
or other implementing class.
```java
public interface PathCapabilities {
boolean hasPathCapability(Path path, String capability)
throws IOException;
}
```
There are a number of goals here:
1. Allow callers to probe for optional filesystem operations without actually
having to invoke them.
1. Allow filesystems with their own optional per-instance features to declare
whether or not they are active for the specific instance.
1. Allow for fileystem connectors which work with object stores to expose the
fundamental difference in semantics of these stores (e.g: files not visible
until closed, file rename being `O(data)`), directory rename being non-atomic,
etc.
### Available Capabilities
Capabilities are defined as strings and split into "Common Capabilites"
and non-standard ones for a specific store.
The common capabilities are all defined under the prefix `fs.capability.`
Consult the javadocs for `org.apache.hadoop.fs.CommonPathCapabilities` for these.
Individual filesystems MAY offer their own set of capabilities which
can be probed for. These MUST begin with `fs.` + the filesystem scheme +
`.capability`. For example `fs.s3a.capability.select.sql`;
### `boolean hasPathCapability(path, capability)`
Probe for the instance offering a specific capability under the
given path.
#### Postconditions
```python
if fs_supports_the_feature(path, capability):
return True
else:
return False
```
Return: `True`, iff the specific capability is available.
A filesystem instance *MUST NOT* return `True` for any capability unless it is
known to be supported by that specific instance. As a result, if a caller
probes for a capability then it can assume that the specific feature/semantics
are available.
If the probe returns `False` then it can mean one of:
1. The capability is unknown.
1. The capability is known, and known to be unavailable on this instance.
1. The capability is known but this local class does not know if it is supported
under the supplied path.
This predicate is intended to be low cost. If it requires remote calls other
than path/link resolution, it SHOULD conclude that the availability
of the feature is unknown and return `False`.
The predicate MUST also be side-effect free.
*Validity of paths*
There is no requirement that the existence of the path must be checked;
the parameter exists so that any filesystem which relays operations to other
filesystems (e.g `viewfs`) can resolve and relay it to the nested filesystem.
Consider the call to be *relatively* lightweight.
Because of this, it may be that while the filesystem declares that
it supports a capability under a path, the actual invocation of the operation
may fail for other reasons.
As an example, while a filesystem may support `append()` under a path,
if invoked on a directory, the call may fail.
That is for a path `root = new Path("/")`: the capabilities call may succeed
```java
fs.hasCapabilities(root, "fs.capability.append") == true
```
But a subsequent call to the operation on that specific path may fail,
because the root path is a directory:
```java
fs.append(root)
```
Similarly, there is no checking that the caller has the permission to
perform a specific operation: just because a feature is available on that
path does not mean that the caller can execute the operation.
The `hasCapabilities(path, capability)` probe is therefore declaring that
the operation will not be rejected as unsupported, not that a specific invocation
will be permitted on that path by the caller.
*Duration of availability*
As the state of a remote store changes,so may path capabilities. This
may be due to changes in the local state of the fileystem (e.g. symbolic links
or mount points changing), or changes in its functionality (e.g. a feature
becoming availaible/unavailable due to operational changes, system upgrades, etc.)
*Capabilities which must be invoked to determine availablity*
Some operations may be known by the client connector, and believed to be available,
but may actually fail when invoked due to the state and permissons of the remote
store —state which is cannot be determined except by attempting
side-effecting operations.
A key example of this is symbolic links and the local filesystem.
The filesystem declares that it supports this unless symbolic links are explicitly
disabled —when invoked they may actually fail.
### Implementors Notes
Implementors *MUST NOT* return `true` for any capability which is not guaranteed
to be supported. To return `true` indicates that the implementation/deployment
of the filesystem does, to the best of the knowledge of the filesystem client,
offer the desired operations *and semantics* queried for.
For performance reasons, implementations *SHOULD NOT* check the path for
existence, unless it needs to resolve symbolic links in parts of the path
to determine whether a feature is present. This is required of `FileContext`
and `viewfs`.
Individual filesystems *MUST NOT* unilaterally define new `fs.capability`-prefixed
capabilities. Instead they *MUST* do one of the following:
* Define and stabilize new cross-filesystem capability flags (preferred),
and so formally add a new `fs.capability` value.
* Use the scheme of the filesystem to as a prefix for their own options,
e.g `fs.hdfs.`

View File

@ -18,12 +18,15 @@
package org.apache.hadoop.fs.contract;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasPathCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
@ -149,4 +152,11 @@ public abstract class AbstractContractAppendTest extends AbstractFSContractTestB
dataset.length);
ContractTestUtils.compareByteArrays(dataset, bytes, dataset.length);
}
@Test
public void testFileSystemDeclaresCapability() throws Throwable {
assertHasPathCapabilities(getFileSystem(), target,
CommonPathCapabilities.FS_APPEND);
}
}

View File

@ -24,7 +24,9 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.CommonPathCapabilities.FS_CONCAT;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertFileHasLength;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasPathCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
@ -93,4 +95,9 @@ public abstract class AbstractContractConcatTest extends AbstractFSContractTestB
() -> getFileSystem().concat(target, new Path[]{target})));
}
@Test
public void testFileSystemDeclaresCapability() throws Throwable {
assertHasPathCapabilities(getFileSystem(), target, FS_CONCAT);
}
}

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathCapabilities;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.io.IOUtils;
@ -1466,22 +1467,92 @@ public class ContractTestUtils extends Assert {
assertTrue("Stream should be instanceof StreamCapabilities",
stream instanceof StreamCapabilities);
if (shouldHaveCapabilities!=null) {
StreamCapabilities source = (StreamCapabilities) stream;
if (shouldHaveCapabilities != null) {
for (String shouldHaveCapability : shouldHaveCapabilities) {
assertTrue("Should have capability: " + shouldHaveCapability,
((StreamCapabilities) stream).hasCapability(shouldHaveCapability));
source.hasCapability(shouldHaveCapability));
}
}
if (shouldNotHaveCapabilities!=null) {
if (shouldNotHaveCapabilities != null) {
for (String shouldNotHaveCapability : shouldNotHaveCapabilities) {
assertFalse("Should not have capability: " + shouldNotHaveCapability,
((StreamCapabilities) stream)
.hasCapability(shouldNotHaveCapability));
source.hasCapability(shouldNotHaveCapability));
}
}
}
/**
* Custom assert to test {@link PathCapabilities}.
*
* @param source source (FS, FC, etc)
* @param path path to check
* @param capabilities The array of unexpected capabilities
*/
public static void assertHasPathCapabilities(
final PathCapabilities source,
final Path path,
final String...capabilities) throws IOException {
for (String shouldHaveCapability: capabilities) {
assertTrue("Should have capability: " + shouldHaveCapability
+ " under " + path,
source.hasPathCapability(path, shouldHaveCapability));
}
}
/**
* Custom assert to test that the named {@link PathCapabilities}
* are not supported.
*
* @param source source (FS, FC, etc)
* @param path path to check
* @param capabilities The array of unexpected capabilities
*/
public static void assertLacksPathCapabilities(
final PathCapabilities source,
final Path path,
final String...capabilities) throws IOException {
for (String shouldHaveCapability: capabilities) {
assertFalse("Path must not support capability: " + shouldHaveCapability
+ " under " + path,
source.hasPathCapability(path, shouldHaveCapability));
}
}
/**
* Function which calls {@code InputStream.read()} and
* downgrades an IOE to a runtime exception.
* @param in input
* @return the read value
* @throws AssertionError on any IOException
*/
public static int read(InputStream in) {
try {
return in.read();
} catch (IOException ex) {
throw new AssertionError(ex);
}
}
/**
* Read a whole stream; downgrades an IOE to a runtime exception.
* @param in input
* @return the number of bytes read.
* @throws AssertionError on any IOException
*/
public static long readStream(InputStream in) {
long count = 0;
while (read(in) >= 0) {
count++;
}
return count;
}
/**
* Results of recursive directory creation/scan operations.
*/

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
import org.apache.hadoop.hdfs.client.DfsPathCapabilities;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
@ -3400,4 +3401,22 @@ public class DistributedFileSystem extends FileSystem
public HdfsDataOutputStreamBuilder appendFile(Path path) {
return new HdfsDataOutputStreamBuilder(this, path).append();
}
/**
* HDFS client capabilities.
* Uses {@link DfsPathCapabilities} to keep {@code WebHdfsFileSystem} in sync.
* {@inheritDoc}
*/
@Override
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
// qualify the path to make sure that it refers to the current FS.
final Path p = makeQualified(path);
Optional<Boolean> cap = DfsPathCapabilities.hasPathCapability(p,
capability);
if (cap.isPresent()) {
return cap.get();
}
return super.hasPathCapability(p, capability);
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.client;
import java.util.Optional;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
public final class DfsPathCapabilities {
private DfsPathCapabilities() {
}
/**
* Common implementation of {@code hasPathCapability} for DFS and webhdfs.
* @param path path to check
* @param capability capability
* @return either a value to return or, if empty, a cue for the FS to
* pass up to its superclass.
*/
public static Optional<Boolean> hasPathCapability(final Path path,
final String capability) {
switch (validatePathCapabilityArgs(path, capability)) {
case CommonPathCapabilities.FS_ACLS:
case CommonPathCapabilities.FS_APPEND:
case CommonPathCapabilities.FS_CHECKSUMS:
case CommonPathCapabilities.FS_CONCAT:
case CommonPathCapabilities.FS_LIST_CORRUPT_FILE_BLOCKS:
case CommonPathCapabilities.FS_PATHHANDLES:
case CommonPathCapabilities.FS_PERMISSIONS:
case CommonPathCapabilities.FS_SNAPSHOTS:
case CommonPathCapabilities.FS_STORAGEPOLICY:
case CommonPathCapabilities.FS_XATTRS:
return Optional.of(true);
case CommonPathCapabilities.FS_SYMLINKS:
return Optional.of(FileSystem.areSymlinksEnabled());
default:
return Optional.empty();
}
}
}

View File

@ -46,7 +46,9 @@ import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.TimeUnit;
@ -90,6 +92,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HAUtilClient;
import org.apache.hadoop.hdfs.HdfsKMSUtil;
import org.apache.hadoop.hdfs.client.DfsPathCapabilities;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@ -1132,6 +1135,11 @@ public class WebHdfsFileSystem extends FileSystem
).run();
}
@Override
public boolean supportsSymlinks() {
return true;
}
/**
* Create a symlink pointing to the destination path.
*/
@ -2038,6 +2046,24 @@ public class WebHdfsFileSystem extends FileSystem
testProvider = kp;
}
/**
* HDFS client capabilities.
* Uses {@link DfsPathCapabilities} to keep in sync with HDFS.
* {@inheritDoc}
*/
@Override
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
// qualify the path to make sure that it refers to the current FS.
final Path p = makeQualified(path);
Optional<Boolean> cap = DfsPathCapabilities.hasPathCapability(p,
capability);
if (cap.isPresent()) {
return cap.get();
}
return super.hasPathCapability(p, capability);
}
/**
* This class is used for opening, reading, and seeking files while using the
* WebHdfsFileSystem. This class will invoke the retry policy when performing

View File

@ -26,6 +26,7 @@ import java.util.List;
import com.google.common.base.Charsets;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.DelegationTokenRenewer;
import org.apache.hadoop.fs.FSDataInputStream;
@ -87,6 +88,8 @@ import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
/**
* HttpFSServer implementation of the FileSystemAccess FileSystem.
* <p>
@ -1561,4 +1564,30 @@ public class HttpFSFileSystem extends FileSystem
return JsonUtilClient.toSnapshottableDirectoryList(json);
}
/**
* This filesystem's capabilities must be in sync with that of
* {@code DistributedFileSystem.hasPathCapability()} except
* where the feature is not exposed (e.g. symlinks).
* {@inheritDoc}
*/
@Override
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
// query the superclass, which triggers argument validation.
final Path p = makeQualified(path);
switch (validatePathCapabilityArgs(p, capability)) {
case CommonPathCapabilities.FS_ACLS:
case CommonPathCapabilities.FS_APPEND:
case CommonPathCapabilities.FS_CONCAT:
case CommonPathCapabilities.FS_PERMISSIONS:
case CommonPathCapabilities.FS_SNAPSHOTS:
case CommonPathCapabilities.FS_STORAGEPOLICY:
case CommonPathCapabilities.FS_XATTRS:
return true;
case CommonPathCapabilities.FS_SYMLINKS:
return false;
default:
return super.hasPathCapability(p, capability);
}
}
}

View File

@ -497,17 +497,19 @@ class S3ABlockOutputStream extends OutputStream implements
* @param capability string to query the stream support for.
* @return true if the capability is supported by this instance.
*/
@SuppressWarnings("deprecation")
@Override
public boolean hasCapability(String capability) {
switch (capability.toLowerCase(Locale.ENGLISH)) {
// does the output stream have delayed visibility
case CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT:
case CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT_OLD:
return !putTracker.outputImmediatelyVisible();
// The flush/sync options are absolutely not supported
case "hflush":
case "hsync":
case StreamCapabilities.HFLUSH:
case StreamCapabilities.HSYNC:
return false;
default:

View File

@ -34,7 +34,6 @@ import java.util.Date;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.Objects;
@ -83,6 +82,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -119,6 +119,7 @@ import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.Invoker.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
@ -3362,21 +3363,43 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
return instrumentation.newCommitterStatistics();
}
/**
* Return the capabilities of this filesystem instance.
* @param capability string to query the stream support for.
* @return whether the FS instance has the capability.
*/
@SuppressWarnings("deprecation")
@Override
public boolean hasCapability(String capability) {
switch (capability.toLowerCase(Locale.ENGLISH)) {
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
final Path p = makeQualified(path);
switch (validatePathCapabilityArgs(p, capability)) {
case CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER:
case CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER_OLD:
// capability depends on FS configuration
return isMagicCommitEnabled();
case CommonPathCapabilities.FS_CHECKSUMS:
// capability depends on FS configuration
return getConf().getBoolean(ETAG_CHECKSUM_ENABLED,
ETAG_CHECKSUM_ENABLED_DEFAULT);
default:
return super.hasPathCapability(p, capability);
}
}
/**
* Return the capabilities of this filesystem instance.
*
* This has been supplanted by {@link #hasPathCapability(Path, String)}.
* @param capability string to query the stream support for.
* @return whether the FS instance has the capability.
*/
@Deprecated
@Override
public boolean hasCapability(String capability) {
try {
return hasPathCapability(workingDir, capability);
} catch (IOException ex) {
// should never happen, so log and downgrade.
LOG.debug("Ignoring exception on hasCapability({}})", capability, ex);
return false;
}
}

View File

@ -78,14 +78,32 @@ public final class CommitConstants {
* Value: {@value}.
*/
public static final String STREAM_CAPABILITY_MAGIC_OUTPUT
= "fs.s3a.capability.magic.output.stream";
/**
* Flag to indicate that a store supports magic committers.
* returned in {@code PathCapabilities}
* Value: {@value}.
*/
public static final String STORE_CAPABILITY_MAGIC_COMMITTER
= "fs.s3a.capability.magic.committer";
/**
* Flag to indicate whether a stream is a magic output stream;
* returned in {@code StreamCapabilities}
* Value: {@value}.
*/
@Deprecated
public static final String STREAM_CAPABILITY_MAGIC_OUTPUT_OLD
= "s3a:magic.output.stream";
/**
* Flag to indicate that a store supports magic committers.
* returned in {@code StreamCapabilities}
* returned in {@code PathCapabilities}
* Value: {@value}.
*/
public static final String STORE_CAPABILITY_MAGIC_COMMITTER
@Deprecated
public static final String STORE_CAPABILITY_MAGIC_COMMITTER_OLD
= "s3a:magic.committer";
/**

View File

@ -1102,7 +1102,8 @@ public abstract class S3GuardTool extends Configured implements Tool {
} else {
println(out, "Filesystem %s is not using S3Guard", fsUri);
}
boolean magic = fs.hasCapability(
boolean magic = fs.hasPathCapability(
new Path(s3Path),
CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER);
println(out, "The \"magic\" committer %s supported",
magic ? "is" : "is not");

View File

@ -28,12 +28,15 @@ import com.amazonaws.services.s3.model.PutObjectRequest;
import org.junit.Assume;
import org.junit.Test;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.store.EtagChecksum;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasPathCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksPathCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
@ -142,6 +145,8 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
Path file1 = touchFile("file1");
EtagChecksum checksum1 = fs.getFileChecksum(file1, 0);
LOG.info("Checksum for {}: {}", file1, checksum1);
assertHasPathCapabilities(fs, file1,
CommonPathCapabilities.FS_CHECKSUMS);
assertNotNull("Null file 1 checksum", checksum1);
assertNotEquals("file 1 checksum", 0, checksum1.getLength());
assertEquals("checksums", checksum1,
@ -159,6 +164,8 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
final S3AFileSystem fs = getFileSystem();
Path file1 = touchFile("file1");
EtagChecksum checksum1 = fs.getFileChecksum(file1, 0);
assertLacksPathCapabilities(fs, file1,
CommonPathCapabilities.FS_CHECKSUMS);
assertNull("Checksums are being generated", checksum1);
}

View File

@ -1000,9 +1000,12 @@ public final class S3ATestUtils {
* Skip a test if the FS isn't marked as supporting magic commits.
* @param fs filesystem
*/
public static void assumeMagicCommitEnabled(S3AFileSystem fs) {
public static void assumeMagicCommitEnabled(S3AFileSystem fs)
throws IOException {
assume("Magic commit option disabled on " + fs,
fs.hasCapability(CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER));
fs.hasPathCapability(
fs.getWorkingDirectory(),
CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER));
}
/**

View File

@ -24,7 +24,6 @@ import java.io.IOException;
import java.util.List;
import com.amazonaws.services.s3.model.PartETag;
import org.junit.Assume;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -528,10 +527,7 @@ public class ITestCommitOperations extends AbstractCommitITest {
@Test
public void testWriteNormalStream() throws Throwable {
S3AFileSystem fs = getFileSystem();
Assume.assumeTrue(
"Filesystem does not have magic support enabled: " + fs,
fs.hasCapability(STORE_CAPABILITY_MAGIC_COMMITTER));
assumeMagicCommitEnabled(fs);
Path destFile = path("normal");
try (FSDataOutputStream out = fs.create(destFile, true)) {
out.writeChars("data");

View File

@ -348,7 +348,7 @@ public abstract class AbstractS3GuardToolTestBase extends AbstractS3ATestBase {
String name = fs.getUri().toString();
S3GuardTool.BucketInfo cmd = new S3GuardTool.BucketInfo(
getConfiguration());
if (fs.hasCapability(
if (fs.hasPathCapability(fs.getWorkingDirectory(),
CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER)) {
// if the FS is magic, expect this to work
exec(cmd, S3GuardTool.BucketInfo.MAGIC_FLAG, name);

View File

@ -46,6 +46,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.ContentSummary.Builder;
import org.apache.hadoop.fs.CreateFlag;
@ -70,6 +71,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.VersionInfo;
import static org.apache.hadoop.fs.adl.AdlConfKeys.*;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
/**
* A FileSystem to access Azure Data Lake Store.
@ -1029,4 +1031,20 @@ public class AdlFileSystem extends FileSystem {
}
return dest;
}
@Override
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
case CommonPathCapabilities.FS_ACLS:
case CommonPathCapabilities.FS_APPEND:
case CommonPathCapabilities.FS_CONCAT:
case CommonPathCapabilities.FS_PERMISSIONS:
return true;
default:
return super.hasPathCapability(path, capability);
}
}
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -84,6 +85,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.azure.NativeAzureFileSystemHelper.*;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
@ -3902,4 +3904,19 @@ public class NativeAzureFileSystem extends FileSystem {
void updateDaemonUsers(List<String> daemonUsers) {
this.daemonUsers = daemonUsers;
}
@Override
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
switch (validatePathCapabilityArgs(path, capability)) {
case CommonPathCapabilities.FS_PERMISSIONS:
return true;
// Append support is dynamic
case CommonPathCapabilities.FS_APPEND:
return appendSupportEnabled;
default:
return super.hasPathCapability(path, capability);
}
}
}

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -76,6 +77,8 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
/**
* A {@link org.apache.hadoop.fs.FileSystem} for reading and writing files stored on <a
* href="http://store.azure.com/">Windows Azure</a>
@ -1120,4 +1123,20 @@ public class AzureBlobFileSystem extends FileSystem {
}
}
}
@Override
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
// qualify the path to make sure that it refers to the current FS.
final Path p = makeQualified(path);
switch (validatePathCapabilityArgs(p, capability)) {
case CommonPathCapabilities.FS_PERMISSIONS:
case CommonPathCapabilities.FS_APPEND:
return true;
case CommonPathCapabilities.FS_ACLS:
return getIsNamespaceEnabled();
default:
return super.hasPathCapability(p, capability);
}
}
}