HADOOP-15691 Add PathCapabilities to FileSystem and FileContext.
Contributed by Steve Loughran. This complements the StreamCapabilities Interface by allowing applications to probe for a specific path on a specific instance of a FileSystem client to offer a specific capability. This is intended to allow applications to determine * Whether a method is implemented before calling it and dealing with UnsupportedOperationException. * Whether a specific feature is believed to be available in the remote store. As well as a common set of capabilities defined in CommonPathCapabilities, file systems are free to add their own capabilities, prefixed with fs. + schema + . The plan is to identify and document more capabilities -and for file systems which add new features, for a declaration of the availability of the feature to always be available. Note * The remote store is not expected to be checked for the feature; It is more a check of client API and the client's configuration/knowledge of the state of the remote system. * Permissions are not checked. Change-Id: I80bfebe94f4a8bdad8f3ac055495735b824968f5
This commit is contained in:
parent
e6fb6ee94d
commit
e346e3638c
|
@ -60,6 +60,8 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
|
||||
/**
|
||||
* This class provides an interface for implementors of a Hadoop file system
|
||||
* (analogous to the VFS of Unix). Applications do not access this class;
|
||||
|
@ -72,7 +74,7 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Stable
|
||||
public abstract class AbstractFileSystem {
|
||||
public abstract class AbstractFileSystem implements PathCapabilities {
|
||||
static final Logger LOG = LoggerFactory.getLogger(AbstractFileSystem.class);
|
||||
|
||||
/** Recording statistics per a file system class. */
|
||||
|
@ -1371,4 +1373,16 @@ public abstract class AbstractFileSystem {
|
|||
new CompletableFuture<>(), () -> open(path, bufferSize));
|
||||
}
|
||||
|
||||
public boolean hasPathCapability(final Path path,
|
||||
final String capability)
|
||||
throws IOException {
|
||||
switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
|
||||
case CommonPathCapabilities.FS_SYMLINKS:
|
||||
// delegate to the existing supportsSymlinks() call.
|
||||
return supportsSymlinks();
|
||||
default:
|
||||
// the feature is not implemented.
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Arrays;
|
|||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
|
@ -42,6 +43,8 @@ import org.apache.hadoop.util.DataChecksum;
|
|||
import org.apache.hadoop.util.LambdaUtils;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
|
||||
/****************************************************************
|
||||
* Abstract Checksumed FileSystem.
|
||||
* It provide a basic implementation of a Checksumed FileSystem,
|
||||
|
@ -872,4 +875,23 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
|
|||
public FSDataOutputStreamBuilder appendFile(Path path) {
|
||||
return createDataOutputStreamBuilder(this, path).append();
|
||||
}
|
||||
|
||||
/**
|
||||
* Disable those operations which the checksummed FS blocks.
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public boolean hasPathCapability(final Path path, final String capability)
|
||||
throws IOException {
|
||||
// query the superclass, which triggers argument validation.
|
||||
final Path p = makeQualified(path);
|
||||
switch (validatePathCapabilityArgs(p, capability)) {
|
||||
case CommonPathCapabilities.FS_APPEND:
|
||||
case CommonPathCapabilities.FS_CONCAT:
|
||||
return false;
|
||||
default:
|
||||
return super.hasPathCapability(p, capability);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,126 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
/**
|
||||
* Common path capabilities.
|
||||
*/
|
||||
public final class CommonPathCapabilities {
|
||||
|
||||
private CommonPathCapabilities() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Does the store support
|
||||
* {@code FileSystem.setAcl(Path, List)},
|
||||
* {@code FileSystem.getAclStatus(Path)}
|
||||
* and related methods?
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String FS_ACLS = "fs.capability.paths.acls";
|
||||
|
||||
/**
|
||||
* Does the store support {@code FileSystem.append(Path)}?
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String FS_APPEND = "fs.capability.paths.append";
|
||||
|
||||
/**
|
||||
* Does the store support {@code FileSystem.getFileChecksum(Path)}?
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String FS_CHECKSUMS = "fs.capability.paths.checksums";
|
||||
|
||||
/**
|
||||
* Does the store support {@code FileSystem.concat(Path, Path[])}?
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String FS_CONCAT = "fs.capability.paths.concat";
|
||||
|
||||
/**
|
||||
* Does the store support {@code FileSystem.listCorruptFileBlocks(Path)} ()}?
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String FS_LIST_CORRUPT_FILE_BLOCKS =
|
||||
"fs.capability.paths.list-corrupt-file-blocks";
|
||||
|
||||
/**
|
||||
* Does the store support
|
||||
* {@code FileSystem.createPathHandle(FileStatus, Options.HandleOpt...)}
|
||||
* and related methods?
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String FS_PATHHANDLES = "fs.capability.paths.pathhandles";
|
||||
|
||||
/**
|
||||
* Does the store support {@code FileSystem.setPermission(Path, FsPermission)}
|
||||
* and related methods?
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String FS_PERMISSIONS = "fs.capability.paths.permissions";
|
||||
|
||||
/**
|
||||
* Does this filesystem connector only support filesystem read operations?
|
||||
* For example, the {@code HttpFileSystem} is always read-only.
|
||||
* This is different from "is the specific instance and path read only?",
|
||||
* which must be determined by checking permissions (where supported), or
|
||||
* attempting write operations under a path.
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String FS_READ_ONLY_CONNECTOR =
|
||||
"fs.capability.paths.read-only-connector";
|
||||
|
||||
/**
|
||||
* Does the store support snapshots through
|
||||
* {@code FileSystem.createSnapshot(Path)} and related methods??
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String FS_SNAPSHOTS = "fs.capability.paths.snapshots";
|
||||
|
||||
/**
|
||||
* Does the store support {@code FileSystem.setStoragePolicy(Path, String)}
|
||||
* and related methods?
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String FS_STORAGEPOLICY =
|
||||
"fs.capability.paths.storagepolicy";
|
||||
|
||||
/**
|
||||
* Does the store support symlinks through
|
||||
* {@code FileSystem.createSymlink(Path, Path, boolean)} and related methods?
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String FS_SYMLINKS =
|
||||
"fs.capability.paths.symlinks";
|
||||
|
||||
/**
|
||||
* Does the store support {@code FileSystem#truncate(Path, long)} ?
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String FS_TRUNCATE =
|
||||
"fs.capability.paths.truncate";
|
||||
|
||||
/**
|
||||
* Does the store support XAttributes through
|
||||
* {@code FileSystem#.setXAttr()} and related methods?
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String FS_XATTRS = "fs.capability.paths.xattrs";
|
||||
|
||||
}
|
|
@ -281,4 +281,11 @@ public abstract class DelegateToFileSystem extends AbstractFileSystem {
|
|||
int bufferSize) throws IOException {
|
||||
return fsImpl.openFileWithOptions(path, mandatoryKeys, options, bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPathCapability(final Path path,
|
||||
final String capability)
|
||||
throws IOException {
|
||||
return fsImpl.hasPathCapability(path, capability);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,6 +46,8 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem.Statistics;
|
||||
import org.apache.hadoop.fs.Options.CreateOpts;
|
||||
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
|
||||
import org.apache.hadoop.fs.impl.FsLinkResolution;
|
||||
import org.apache.hadoop.fs.impl.PathCapabilitiesSupport;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclStatus;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
|
@ -68,6 +70,8 @@ import org.apache.htrace.core.Tracer;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
|
||||
/**
|
||||
* The FileContext class provides an interface for users of the Hadoop
|
||||
* file system. It exposes a number of file system operations, e.g. create,
|
||||
|
@ -171,7 +175,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Stable
|
||||
public class FileContext {
|
||||
public class FileContext implements PathCapabilities {
|
||||
|
||||
public static final Logger LOG = LoggerFactory.getLogger(FileContext.class);
|
||||
/**
|
||||
|
@ -2934,4 +2938,21 @@ public class FileContext {
|
|||
}.resolve(FileContext.this, absF);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the path capabilities of the bonded {@code AbstractFileSystem}.
|
||||
* @param path path to query the capability of.
|
||||
* @param capability string to query the stream support for.
|
||||
* @return true iff the capability is supported under that FS.
|
||||
* @throws IOException path resolution or other IO failure
|
||||
* @throws IllegalArgumentException invalid arguments
|
||||
*/
|
||||
public boolean hasPathCapability(Path path, String capability)
|
||||
throws IOException {
|
||||
validatePathCapabilityArgs(path, capability);
|
||||
return FsLinkResolution.resolve(this,
|
||||
fixRelativePart(path),
|
||||
(fs, p) -> fs.hasPathCapability(p, capability));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -88,6 +88,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
|
||||
/****************************************************************
|
||||
* An abstract base class for a fairly generic filesystem. It
|
||||
|
@ -134,7 +135,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
|
|||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Stable
|
||||
public abstract class FileSystem extends Configured
|
||||
implements Closeable, DelegationTokenIssuer {
|
||||
implements Closeable, DelegationTokenIssuer, PathCapabilities {
|
||||
public static final String FS_DEFAULT_NAME_KEY =
|
||||
CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
|
||||
public static final String DEFAULT_FS =
|
||||
|
@ -720,6 +721,7 @@ public abstract class FileSystem extends Configured
|
|||
*
|
||||
*/
|
||||
protected void checkPath(Path path) {
|
||||
Preconditions.checkArgument(path != null, "null path");
|
||||
URI uri = path.toUri();
|
||||
String thatScheme = uri.getScheme();
|
||||
if (thatScheme == null) // fs is relative
|
||||
|
@ -3259,6 +3261,25 @@ public abstract class FileSystem extends Configured
|
|||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* The base FileSystem implementation generally has no knowledge
|
||||
* of the capabilities of actual implementations.
|
||||
* Unless it has a way to explicitly determine the capabilities,
|
||||
* this method returns false.
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public boolean hasPathCapability(final Path path, final String capability)
|
||||
throws IOException {
|
||||
switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
|
||||
case CommonPathCapabilities.FS_SYMLINKS:
|
||||
// delegate to the existing supportsSymlinks() call.
|
||||
return supportsSymlinks() && areSymlinksEnabled();
|
||||
default:
|
||||
// the feature is not implemented.
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// making it volatile to be able to do a double checked locking
|
||||
private volatile static boolean FILE_SYSTEMS_LOADED = false;
|
||||
|
||||
|
|
|
@ -729,4 +729,11 @@ public class FilterFileSystem extends FileSystem {
|
|||
return fs.openFileWithOptions(pathHandle, mandatoryKeys, options,
|
||||
bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPathCapability(final Path path, final String capability)
|
||||
throws IOException {
|
||||
return fs.hasPathCapability(path, capability);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -446,4 +446,9 @@ public abstract class FilterFs extends AbstractFileSystem {
|
|||
return myFs.openFileWithOptions(path, mandatoryKeys, options, bufferSize);
|
||||
}
|
||||
|
||||
public boolean hasPathCapability(final Path path,
|
||||
final String capability)
|
||||
throws IOException {
|
||||
return myFs.hasPathCapability(path, capability);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,8 @@ import java.net.URISyntaxException;
|
|||
import java.net.URLDecoder;
|
||||
import java.util.*;
|
||||
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
|
||||
/**
|
||||
* This is an implementation of the Hadoop Archive
|
||||
* Filesystem. This archive Filesystem has index files
|
||||
|
@ -899,7 +901,22 @@ public class HarFileSystem extends FileSystem {
|
|||
throws IOException {
|
||||
throw new IOException("Har: setPermission not allowed");
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Declare that this filesystem connector is always read only.
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public boolean hasPathCapability(final Path path, final String capability)
|
||||
throws IOException {
|
||||
switch (validatePathCapabilityArgs(path, capability)) {
|
||||
case CommonPathCapabilities.FS_READ_ONLY_CONNECTOR:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Hadoop archives input stream. This input stream fakes EOF
|
||||
* since archive files are part of bigger part files.
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* The Path counterpoint to {@link StreamCapabilities}; a query to see if,
|
||||
* a FileSystem/FileContext instance has a specific capability under the given
|
||||
* path.
|
||||
* Other classes may also implement the interface, as desired.
|
||||
*
|
||||
* See {@link CommonPathCapabilities} for the well-known capabilities.
|
||||
*/
|
||||
public interface PathCapabilities {
|
||||
|
||||
/**
|
||||
* Probe for a specific capability under the given path.
|
||||
* If the function returns {@code true}, this instance is explicitly
|
||||
* declaring that the capability is available.
|
||||
* If the function returns {@code false}, it can mean one of:
|
||||
* <ul>
|
||||
* <li>The capability is not known.</li>
|
||||
* <li>The capability is known but it is not supported.</li>
|
||||
* <li>The capability is known but the filesystem does not know if it
|
||||
* is supported under the supplied path.</li>
|
||||
* </ul>
|
||||
* The core guarantee which a caller can rely on is: if the predicate
|
||||
* returns true, then the specific operation/behavior can be expected to be
|
||||
* supported. However a specific call may be rejected for permission reasons,
|
||||
* the actual file/directory not being present, or some other failure during
|
||||
* the attempted execution of the operation.
|
||||
* <p>
|
||||
* Implementors: {@link org.apache.hadoop.fs.impl.PathCapabilitiesSupport}
|
||||
* can be used to help implement this method.
|
||||
* @param path path to query the capability of.
|
||||
* @param capability non-null, non-empty string to query the path for support.
|
||||
* @return true if the capability is supported under that part of the FS.
|
||||
* @throws IOException this should not be raised, except on problems
|
||||
* resolving paths or relaying the call.
|
||||
* @throws IllegalArgumentException invalid arguments
|
||||
*/
|
||||
boolean hasPathCapability(Path path, String capability)
|
||||
throws IOException;
|
||||
}
|
|
@ -53,6 +53,8 @@ import org.apache.hadoop.util.Progressable;
|
|||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
|
||||
/****************************************************************
|
||||
* Implement the FileSystem API for the raw local filesystem.
|
||||
*
|
||||
|
@ -1060,4 +1062,21 @@ public class RawLocalFileSystem extends FileSystem {
|
|||
// return an unqualified symlink target
|
||||
return fi.getSymlink();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPathCapability(final Path path, final String capability)
|
||||
throws IOException {
|
||||
switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
|
||||
case CommonPathCapabilities.FS_APPEND:
|
||||
case CommonPathCapabilities.FS_CONCAT:
|
||||
case CommonPathCapabilities.FS_PATHHANDLES:
|
||||
case CommonPathCapabilities.FS_PERMISSIONS:
|
||||
case CommonPathCapabilities.FS_TRUNCATE:
|
||||
return true;
|
||||
case CommonPathCapabilities.FS_SYMLINKS:
|
||||
return FileSystem.areSymlinksEnabled();
|
||||
default:
|
||||
return super.hasPathCapability(path, capability);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.fs.http;
|
|||
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonPathCapabilities;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -36,6 +37,8 @@ import java.io.InputStream;
|
|||
import java.net.URI;
|
||||
import java.net.URLConnection;
|
||||
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
|
||||
abstract class AbstractHttpFileSystem extends FileSystem {
|
||||
private static final long DEFAULT_BLOCK_SIZE = 4096;
|
||||
private static final Path WORKING_DIR = new Path("/");
|
||||
|
@ -111,6 +114,21 @@ abstract class AbstractHttpFileSystem extends FileSystem {
|
|||
return new FileStatus(-1, false, 1, DEFAULT_BLOCK_SIZE, 0, path);
|
||||
}
|
||||
|
||||
/**
|
||||
* Declare that this filesystem connector is always read only.
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public boolean hasPathCapability(final Path path, final String capability)
|
||||
throws IOException {
|
||||
switch (validatePathCapabilityArgs(path, capability)) {
|
||||
case CommonPathCapabilities.FS_READ_ONLY_CONNECTOR:
|
||||
return true;
|
||||
default:
|
||||
return super.hasPathCapability(path, capability);
|
||||
}
|
||||
}
|
||||
|
||||
private static class HttpDataInputStream extends FilterInputStream
|
||||
implements Seekable, PositionedReadable {
|
||||
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.AbstractFileSystem;
|
||||
import org.apache.hadoop.fs.FSLinkResolver;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||
|
||||
/**
|
||||
* Class to allow Lambda expressions to be used in {@link FileContext}
|
||||
* link resolution.
|
||||
* @param <T> type of the returned value.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class FsLinkResolution<T> extends FSLinkResolver<T> {
|
||||
|
||||
/**
|
||||
* The function to invoke in the {@link #next(AbstractFileSystem, Path)} call.
|
||||
*/
|
||||
private final FsLinkResolutionFunction<T> fn;
|
||||
|
||||
/**
|
||||
* Construct an instance with the given function.
|
||||
* @param fn function to invoke.
|
||||
*/
|
||||
public FsLinkResolution(final FsLinkResolutionFunction<T> fn) {
|
||||
this.fn = Preconditions.checkNotNull(fn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public T next(final AbstractFileSystem fs, final Path p)
|
||||
throws UnresolvedLinkException, IOException {
|
||||
return fn.apply(fs, p);
|
||||
}
|
||||
|
||||
/**
|
||||
* The signature of the function to invoke.
|
||||
* @param <T> type resolved to
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface FsLinkResolutionFunction<T> {
|
||||
|
||||
/**
|
||||
*
|
||||
* @param fs filesystem to resolve against.
|
||||
* @param path path to resolve
|
||||
* @return a result of type T
|
||||
* @throws UnresolvedLinkException link resolution failure
|
||||
* @throws IOException other IO failure.
|
||||
*/
|
||||
T apply(final AbstractFileSystem fs, final Path path)
|
||||
throws IOException, UnresolvedLinkException;
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply the given function to the resolved path under the the supplied
|
||||
* FileContext.
|
||||
* @param fileContext file context to resolve under
|
||||
* @param path path to resolve
|
||||
* @param fn function to invoke
|
||||
* @param <T> return type.
|
||||
* @return the return value of the function as revoked against the resolved
|
||||
* path.
|
||||
* @throws UnresolvedLinkException link resolution failure
|
||||
* @throws IOException other IO failure.
|
||||
*/
|
||||
public static <T> T resolve(
|
||||
final FileContext fileContext, final Path path,
|
||||
final FsLinkResolutionFunction<T> fn)
|
||||
throws UnresolvedLinkException, IOException {
|
||||
return new FsLinkResolution<>(fn).resolve(fileContext, path);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.impl;
|
||||
|
||||
import java.util.Locale;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathCapabilities;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class PathCapabilitiesSupport {
|
||||
|
||||
/**
|
||||
* Validate the arguments to
|
||||
* {@link PathCapabilities#hasPathCapability(Path, String)}.
|
||||
* @param path path to query the capability of.
|
||||
* @param capability non-null, non-empty string to query the path for support.
|
||||
* @return the string to use in a switch statement.
|
||||
* @throws IllegalArgumentException if a an argument is invalid.
|
||||
*/
|
||||
public static String validatePathCapabilityArgs(
|
||||
final Path path, final String capability) {
|
||||
checkArgument(path != null, "null path");
|
||||
checkArgument(capability != null, "capability parameter is null");
|
||||
checkArgument(!capability.isEmpty(),
|
||||
"capability parameter is empty string");
|
||||
return capability.toLowerCase(Locale.ENGLISH);
|
||||
}
|
||||
}
|
|
@ -491,4 +491,10 @@ class ChRootedFileSystem extends FilterFileSystem {
|
|||
throws IOException, UnsupportedOperationException {
|
||||
return super.openFile(fullPath(path));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPathCapability(final Path path, final String capability)
|
||||
throws IOException {
|
||||
return super.hasPathCapability(fullPath(path), capability);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.fs.viewfs;
|
||||
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
import static org.apache.hadoop.fs.viewfs.Constants.PERMISSION_555;
|
||||
import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE;
|
||||
import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE_DEFAULT;
|
||||
|
@ -43,6 +44,7 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.BlockStoragePolicySpi;
|
||||
import org.apache.hadoop.fs.CommonPathCapabilities;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
|
@ -1026,6 +1028,36 @@ public class ViewFileSystem extends FileSystem {
|
|||
return res.targetFileSystem.getLinkTarget(res.remainingPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reject the concat operation; forward the rest to the viewed FS.
|
||||
* @param path path to query the capability of.
|
||||
* @param capability string to query the stream support for.
|
||||
* @return the capability
|
||||
* @throws IOException if there is no resolved FS, or it raises an IOE.
|
||||
*/
|
||||
@Override
|
||||
public boolean hasPathCapability(Path path, String capability)
|
||||
throws IOException {
|
||||
final Path p = makeQualified(path);
|
||||
switch (validatePathCapabilityArgs(p, capability)) {
|
||||
case CommonPathCapabilities.FS_CONCAT:
|
||||
// concat is not supported, as it may be invoked across filesystems.
|
||||
return false;
|
||||
default:
|
||||
// no break
|
||||
}
|
||||
// otherwise, check capabilities of mounted FS.
|
||||
try {
|
||||
InodeTree.ResolveResult<FileSystem> res
|
||||
= fsState.resolve(getUriPath(p), true);
|
||||
return res.targetFileSystem.hasPathCapability(res.remainingPath,
|
||||
capability);
|
||||
} catch (FileNotFoundException e) {
|
||||
// no mount point, nothing will work.
|
||||
throw new NotInMountpointException(p, "hasPathCapability");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An instance of this class represents an internal dir of the viewFs
|
||||
* that is internal dir of the mount table.
|
||||
|
|
|
@ -526,7 +526,7 @@ on the filesystem.
|
|||
`getFileStatus(P).getBlockSize()`.
|
||||
1. By inference, it MUST be > 0 for any file of length > 0.
|
||||
|
||||
## State Changing Operations
|
||||
## <a name="state_changing_operations"></a> State Changing Operations
|
||||
|
||||
### `boolean mkdirs(Path p, FsPermission permission)`
|
||||
|
||||
|
@ -1479,7 +1479,7 @@ public interface StreamCapabilities {
|
|||
|
||||
### `boolean hasCapability(capability)`
|
||||
|
||||
Return true if the `OutputStream`, `InputStream`, or other FileSystem class
|
||||
Return true iff the `OutputStream`, `InputStream`, or other FileSystem class
|
||||
has the desired capability.
|
||||
|
||||
The caller can query the capabilities of a stream using a string value.
|
||||
|
@ -1492,3 +1492,4 @@ hsync | HSYNC | Syncable | Flush out the data in client's us
|
|||
in:readahead | READAHEAD | CanSetReadahead | Set the readahead on the input stream.
|
||||
dropbehind | DROPBEHIND | CanSetDropBehind | Drop the cache.
|
||||
in:unbuffer | UNBUFFER | CanUnbuffer | Reduce the buffering on the input stream.
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ HDFS as these are commonly expected by Hadoop client applications.
|
|||
1. [Model](model.html)
|
||||
1. [FileSystem class](filesystem.html)
|
||||
1. [FSDataInputStream class](fsdatainputstream.html)
|
||||
1. [PathCapabilities interface](pathcapabilities.html)
|
||||
1. [FSDataOutputStreamBuilder class](fsdataoutputstreambuilder.html)
|
||||
2. [Testing with the Filesystem specification](testing.html)
|
||||
2. [Extending the specification and its tests](extending.html)
|
||||
|
|
|
@ -0,0 +1,158 @@
|
|||
<!---
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License. See accompanying LICENSE file.
|
||||
-->
|
||||
|
||||
# <a name="PathCapabilities"></a> interface `PathCapabilities`
|
||||
|
||||
The `PathCapabilities` interface provides a way to programmatically query the
|
||||
operations offered under a given path by an instance of `FileSystem`, `FileContext`
|
||||
or other implementing class.
|
||||
|
||||
```java
|
||||
public interface PathCapabilities {
|
||||
boolean hasPathCapability(Path path, String capability)
|
||||
throws IOException;
|
||||
}
|
||||
```
|
||||
|
||||
There are a number of goals here:
|
||||
|
||||
1. Allow callers to probe for optional filesystem operations without actually
|
||||
having to invoke them.
|
||||
1. Allow filesystems with their own optional per-instance features to declare
|
||||
whether or not they are active for the specific instance.
|
||||
1. Allow for fileystem connectors which work with object stores to expose the
|
||||
fundamental difference in semantics of these stores (e.g: files not visible
|
||||
until closed, file rename being `O(data)`), directory rename being non-atomic,
|
||||
etc.
|
||||
|
||||
### Available Capabilities
|
||||
|
||||
Capabilities are defined as strings and split into "Common Capabilites"
|
||||
and non-standard ones for a specific store.
|
||||
|
||||
The common capabilities are all defined under the prefix `fs.capability.`
|
||||
|
||||
Consult the javadocs for `org.apache.hadoop.fs.CommonPathCapabilities` for these.
|
||||
|
||||
|
||||
Individual filesystems MAY offer their own set of capabilities which
|
||||
can be probed for. These MUST begin with `fs.` + the filesystem scheme +
|
||||
`.capability`. For example `fs.s3a.capability.select.sql`;
|
||||
|
||||
### `boolean hasPathCapability(path, capability)`
|
||||
|
||||
Probe for the instance offering a specific capability under the
|
||||
given path.
|
||||
|
||||
#### Postconditions
|
||||
|
||||
```python
|
||||
if fs_supports_the_feature(path, capability):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
```
|
||||
|
||||
Return: `True`, iff the specific capability is available.
|
||||
|
||||
A filesystem instance *MUST NOT* return `True` for any capability unless it is
|
||||
known to be supported by that specific instance. As a result, if a caller
|
||||
probes for a capability then it can assume that the specific feature/semantics
|
||||
are available.
|
||||
|
||||
If the probe returns `False` then it can mean one of:
|
||||
|
||||
1. The capability is unknown.
|
||||
1. The capability is known, and known to be unavailable on this instance.
|
||||
1. The capability is known but this local class does not know if it is supported
|
||||
under the supplied path.
|
||||
|
||||
This predicate is intended to be low cost. If it requires remote calls other
|
||||
than path/link resolution, it SHOULD conclude that the availability
|
||||
of the feature is unknown and return `False`.
|
||||
|
||||
The predicate MUST also be side-effect free.
|
||||
|
||||
*Validity of paths*
|
||||
There is no requirement that the existence of the path must be checked;
|
||||
the parameter exists so that any filesystem which relays operations to other
|
||||
filesystems (e.g `viewfs`) can resolve and relay it to the nested filesystem.
|
||||
Consider the call to be *relatively* lightweight.
|
||||
|
||||
Because of this, it may be that while the filesystem declares that
|
||||
it supports a capability under a path, the actual invocation of the operation
|
||||
may fail for other reasons.
|
||||
|
||||
As an example, while a filesystem may support `append()` under a path,
|
||||
if invoked on a directory, the call may fail.
|
||||
|
||||
That is for a path `root = new Path("/")`: the capabilities call may succeed
|
||||
|
||||
```java
|
||||
fs.hasCapabilities(root, "fs.capability.append") == true
|
||||
```
|
||||
|
||||
But a subsequent call to the operation on that specific path may fail,
|
||||
because the root path is a directory:
|
||||
|
||||
```java
|
||||
fs.append(root)
|
||||
```
|
||||
|
||||
|
||||
Similarly, there is no checking that the caller has the permission to
|
||||
perform a specific operation: just because a feature is available on that
|
||||
path does not mean that the caller can execute the operation.
|
||||
|
||||
The `hasCapabilities(path, capability)` probe is therefore declaring that
|
||||
the operation will not be rejected as unsupported, not that a specific invocation
|
||||
will be permitted on that path by the caller.
|
||||
|
||||
*Duration of availability*
|
||||
|
||||
As the state of a remote store changes,so may path capabilities. This
|
||||
may be due to changes in the local state of the fileystem (e.g. symbolic links
|
||||
or mount points changing), or changes in its functionality (e.g. a feature
|
||||
becoming availaible/unavailable due to operational changes, system upgrades, etc.)
|
||||
|
||||
*Capabilities which must be invoked to determine availablity*
|
||||
|
||||
Some operations may be known by the client connector, and believed to be available,
|
||||
but may actually fail when invoked due to the state and permissons of the remote
|
||||
store —state which is cannot be determined except by attempting
|
||||
side-effecting operations.
|
||||
|
||||
A key example of this is symbolic links and the local filesystem.
|
||||
The filesystem declares that it supports this unless symbolic links are explicitly
|
||||
disabled —when invoked they may actually fail.
|
||||
|
||||
### Implementors Notes
|
||||
|
||||
Implementors *MUST NOT* return `true` for any capability which is not guaranteed
|
||||
to be supported. To return `true` indicates that the implementation/deployment
|
||||
of the filesystem does, to the best of the knowledge of the filesystem client,
|
||||
offer the desired operations *and semantics* queried for.
|
||||
|
||||
For performance reasons, implementations *SHOULD NOT* check the path for
|
||||
existence, unless it needs to resolve symbolic links in parts of the path
|
||||
to determine whether a feature is present. This is required of `FileContext`
|
||||
and `viewfs`.
|
||||
|
||||
Individual filesystems *MUST NOT* unilaterally define new `fs.capability`-prefixed
|
||||
capabilities. Instead they *MUST* do one of the following:
|
||||
|
||||
* Define and stabilize new cross-filesystem capability flags (preferred),
|
||||
and so formally add a new `fs.capability` value.
|
||||
* Use the scheme of the filesystem to as a prefix for their own options,
|
||||
e.g `fs.hdfs.`
|
|
@ -18,12 +18,15 @@
|
|||
|
||||
package org.apache.hadoop.fs.contract;
|
||||
|
||||
import org.apache.hadoop.fs.CommonPathCapabilities;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasPathCapabilities;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||
|
@ -155,4 +158,11 @@ public abstract class AbstractContractAppendTest extends AbstractFSContractTestB
|
|||
dataset.length);
|
||||
ContractTestUtils.compareByteArrays(dataset, bytes, dataset.length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFileSystemDeclaresCapability() throws Throwable {
|
||||
assertHasPathCapabilities(getFileSystem(), target,
|
||||
CommonPathCapabilities.FS_APPEND);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -24,7 +24,9 @@ import org.junit.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonPathCapabilities.FS_CONCAT;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertFileHasLength;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasPathCapabilities;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||
|
@ -93,4 +95,9 @@ public abstract class AbstractContractConcatTest extends AbstractFSContractTestB
|
|||
() -> getFileSystem().concat(target, new Path[]{target})));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFileSystemDeclaresCapability() throws Throwable {
|
||||
assertHasPathCapabilities(getFileSystem(), target, FS_CONCAT);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathCapabilities;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
@ -1491,22 +1492,61 @@ public class ContractTestUtils extends Assert {
|
|||
assertTrue("Stream should be instanceof StreamCapabilities",
|
||||
stream instanceof StreamCapabilities);
|
||||
|
||||
if (shouldHaveCapabilities!=null) {
|
||||
StreamCapabilities source = (StreamCapabilities) stream;
|
||||
if (shouldHaveCapabilities != null) {
|
||||
for (String shouldHaveCapability : shouldHaveCapabilities) {
|
||||
assertTrue("Should have capability: " + shouldHaveCapability,
|
||||
((StreamCapabilities) stream).hasCapability(shouldHaveCapability));
|
||||
source.hasCapability(shouldHaveCapability));
|
||||
}
|
||||
}
|
||||
|
||||
if (shouldNotHaveCapabilities!=null) {
|
||||
if (shouldNotHaveCapabilities != null) {
|
||||
for (String shouldNotHaveCapability : shouldNotHaveCapabilities) {
|
||||
assertFalse("Should not have capability: " + shouldNotHaveCapability,
|
||||
((StreamCapabilities) stream)
|
||||
.hasCapability(shouldNotHaveCapability));
|
||||
source.hasCapability(shouldNotHaveCapability));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Custom assert to test {@link PathCapabilities}.
|
||||
*
|
||||
* @param source source (FS, FC, etc)
|
||||
* @param path path to check
|
||||
* @param capabilities The array of unexpected capabilities
|
||||
*/
|
||||
public static void assertHasPathCapabilities(
|
||||
final PathCapabilities source,
|
||||
final Path path,
|
||||
final String...capabilities) throws IOException {
|
||||
|
||||
for (String shouldHaveCapability: capabilities) {
|
||||
assertTrue("Should have capability: " + shouldHaveCapability
|
||||
+ " under " + path,
|
||||
source.hasPathCapability(path, shouldHaveCapability));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Custom assert to test that the named {@link PathCapabilities}
|
||||
* are not supported.
|
||||
*
|
||||
* @param source source (FS, FC, etc)
|
||||
* @param path path to check
|
||||
* @param capabilities The array of unexpected capabilities
|
||||
*/
|
||||
public static void assertLacksPathCapabilities(
|
||||
final PathCapabilities source,
|
||||
final Path path,
|
||||
final String...capabilities) throws IOException {
|
||||
|
||||
for (String shouldHaveCapability: capabilities) {
|
||||
assertFalse("Path must not support capability: " + shouldHaveCapability
|
||||
+ " under " + path,
|
||||
source.hasPathCapability(path, shouldHaveCapability));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Function which calls {@code InputStream.read()} and
|
||||
* downgrades an IOE to a runtime exception.
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
|
|||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.BlockStoragePolicySpi;
|
||||
import org.apache.hadoop.fs.CacheFlag;
|
||||
import org.apache.hadoop.fs.CommonPathCapabilities;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
|
@ -67,6 +68,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
|
|||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
|
||||
import org.apache.hadoop.hdfs.client.DfsPathCapabilities;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
|
||||
|
@ -120,6 +122,8 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
|
||||
/****************************************************************
|
||||
* Implementation of the abstract FileSystem for the DFS system.
|
||||
* This object is the way end-user code interacts with a Hadoop
|
||||
|
@ -3404,4 +3408,22 @@ public class DistributedFileSystem extends FileSystem
|
|||
public HdfsDataOutputStreamBuilder appendFile(Path path) {
|
||||
return new HdfsDataOutputStreamBuilder(this, path).append();
|
||||
}
|
||||
|
||||
/**
|
||||
* HDFS client capabilities.
|
||||
* Uses {@link DfsPathCapabilities} to keep {@code WebHdfsFileSystem} in sync.
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public boolean hasPathCapability(final Path path, final String capability)
|
||||
throws IOException {
|
||||
// qualify the path to make sure that it refers to the current FS.
|
||||
final Path p = makeQualified(path);
|
||||
Optional<Boolean> cap = DfsPathCapabilities.hasPathCapability(p,
|
||||
capability);
|
||||
if (cap.isPresent()) {
|
||||
return cap.get();
|
||||
}
|
||||
return super.hasPathCapability(p, capability);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.client;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.hadoop.fs.CommonPathCapabilities;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
|
||||
public final class DfsPathCapabilities {
|
||||
|
||||
private DfsPathCapabilities() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Common implementation of {@code hasPathCapability} for DFS and webhdfs.
|
||||
* @param path path to check
|
||||
* @param capability capability
|
||||
* @return either a value to return or, if empty, a cue for the FS to
|
||||
* pass up to its superclass.
|
||||
*/
|
||||
public static Optional<Boolean> hasPathCapability(final Path path,
|
||||
final String capability) {
|
||||
switch (validatePathCapabilityArgs(path, capability)) {
|
||||
|
||||
case CommonPathCapabilities.FS_ACLS:
|
||||
case CommonPathCapabilities.FS_APPEND:
|
||||
case CommonPathCapabilities.FS_CHECKSUMS:
|
||||
case CommonPathCapabilities.FS_CONCAT:
|
||||
case CommonPathCapabilities.FS_LIST_CORRUPT_FILE_BLOCKS:
|
||||
case CommonPathCapabilities.FS_PATHHANDLES:
|
||||
case CommonPathCapabilities.FS_PERMISSIONS:
|
||||
case CommonPathCapabilities.FS_SNAPSHOTS:
|
||||
case CommonPathCapabilities.FS_STORAGEPOLICY:
|
||||
case CommonPathCapabilities.FS_XATTRS:
|
||||
return Optional.of(true);
|
||||
case CommonPathCapabilities.FS_SYMLINKS:
|
||||
return Optional.of(FileSystem.areSymlinksEnabled());
|
||||
default:
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -46,7 +46,9 @@ import java.util.Collection;
|
|||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.StringTokenizer;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -61,6 +63,7 @@ import org.apache.hadoop.crypto.key.KeyProvider;
|
|||
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.CommonPathCapabilities;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.DelegationTokenRenewer;
|
||||
|
@ -74,6 +77,7 @@ import org.apache.hadoop.fs.FsServerDefaults;
|
|||
import org.apache.hadoop.fs.GlobalStorageStatistics;
|
||||
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
|
||||
import org.apache.hadoop.fs.QuotaUsage;
|
||||
import org.apache.hadoop.fs.PathCapabilities;
|
||||
import org.apache.hadoop.fs.StorageStatistics;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.fs.permission.FsCreateModes;
|
||||
|
@ -91,6 +95,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
|||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.HAUtilClient;
|
||||
import org.apache.hadoop.hdfs.HdfsKMSUtil;
|
||||
import org.apache.hadoop.hdfs.client.DfsPathCapabilities;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
|
@ -132,6 +137,8 @@ import com.google.common.base.Charsets;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
|
||||
/** A FileSystem for HDFS over the web. */
|
||||
public class WebHdfsFileSystem extends FileSystem
|
||||
implements DelegationTokenRenewer.Renewable,
|
||||
|
@ -1125,6 +1132,11 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
).run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supportsSymlinks() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a symlink pointing to the destination path.
|
||||
*/
|
||||
|
@ -2079,6 +2091,24 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
testProvider = kp;
|
||||
}
|
||||
|
||||
/**
|
||||
* HDFS client capabilities.
|
||||
* Uses {@link DfsPathCapabilities} to keep in sync with HDFS.
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public boolean hasPathCapability(final Path path, final String capability)
|
||||
throws IOException {
|
||||
// qualify the path to make sure that it refers to the current FS.
|
||||
final Path p = makeQualified(path);
|
||||
Optional<Boolean> cap = DfsPathCapabilities.hasPathCapability(p,
|
||||
capability);
|
||||
if (cap.isPresent()) {
|
||||
return cap.get();
|
||||
}
|
||||
return super.hasPathCapability(p, capability);
|
||||
}
|
||||
|
||||
/**
|
||||
* This class is used for opening, reading, and seeking files while using the
|
||||
* WebHdfsFileSystem. This class will invoke the retry policy when performing
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.List;
|
|||
import com.google.common.base.Charsets;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonPathCapabilities;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.DelegationTokenRenewer;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
|
@ -85,8 +86,11 @@ import java.net.URL;
|
|||
import java.security.PrivilegedExceptionAction;
|
||||
import java.text.MessageFormat;
|
||||
import java.util.HashMap;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
|
||||
/**
|
||||
* HttpFSServer implementation of the FileSystemAccess FileSystem.
|
||||
* <p>
|
||||
|
@ -1561,4 +1565,30 @@ public class HttpFSFileSystem extends FileSystem
|
|||
return JsonUtilClient.toSnapshottableDirectoryList(json);
|
||||
}
|
||||
|
||||
/**
|
||||
* This filesystem's capabilities must be in sync with that of
|
||||
* {@code DistributedFileSystem.hasPathCapability()} except
|
||||
* where the feature is not exposed (e.g. symlinks).
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public boolean hasPathCapability(final Path path, final String capability)
|
||||
throws IOException {
|
||||
// query the superclass, which triggers argument validation.
|
||||
final Path p = makeQualified(path);
|
||||
switch (validatePathCapabilityArgs(p, capability)) {
|
||||
case CommonPathCapabilities.FS_ACLS:
|
||||
case CommonPathCapabilities.FS_APPEND:
|
||||
case CommonPathCapabilities.FS_CONCAT:
|
||||
case CommonPathCapabilities.FS_PERMISSIONS:
|
||||
case CommonPathCapabilities.FS_SNAPSHOTS:
|
||||
case CommonPathCapabilities.FS_STORAGEPOLICY:
|
||||
case CommonPathCapabilities.FS_XATTRS:
|
||||
return true;
|
||||
case CommonPathCapabilities.FS_SYMLINKS:
|
||||
return false;
|
||||
default:
|
||||
return super.hasPathCapability(p, capability);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -497,17 +497,19 @@ class S3ABlockOutputStream extends OutputStream implements
|
|||
* @param capability string to query the stream support for.
|
||||
* @return true if the capability is supported by this instance.
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public boolean hasCapability(String capability) {
|
||||
switch (capability.toLowerCase(Locale.ENGLISH)) {
|
||||
|
||||
// does the output stream have delayed visibility
|
||||
case CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT:
|
||||
case CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT_OLD:
|
||||
return !putTracker.outputImmediatelyVisible();
|
||||
|
||||
// The flush/sync options are absolutely not supported
|
||||
case "hflush":
|
||||
case "hsync":
|
||||
case StreamCapabilities.HFLUSH:
|
||||
case StreamCapabilities.HSYNC:
|
||||
return false;
|
||||
|
||||
default:
|
||||
|
|
|
@ -36,7 +36,6 @@ import java.util.Collections;
|
|||
import java.util.Date;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
@ -91,6 +90,7 @@ import org.apache.commons.lang3.tuple.Triple;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonPathCapabilities;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
|
@ -152,6 +152,7 @@ import org.apache.hadoop.util.ReflectionUtils;
|
|||
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
|
||||
|
||||
import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys;
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.apache.hadoop.fs.s3a.Invoker.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
||||
|
@ -4084,17 +4085,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
return instrumentation.newCommitterStatistics();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the capabilities of this filesystem instance.
|
||||
* @param capability string to query the stream support for.
|
||||
* @return whether the FS instance has the capability.
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public boolean hasCapability(String capability) {
|
||||
|
||||
switch (capability.toLowerCase(Locale.ENGLISH)) {
|
||||
public boolean hasPathCapability(final Path path, final String capability)
|
||||
throws IOException {
|
||||
final Path p = makeQualified(path);
|
||||
switch (validatePathCapabilityArgs(p, capability)) {
|
||||
|
||||
case CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER:
|
||||
case CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER_OLD:
|
||||
// capability depends on FS configuration
|
||||
return isMagicCommitEnabled();
|
||||
|
||||
|
@ -4102,7 +4101,31 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
// select is only supported if enabled
|
||||
return selectBinding.isEnabled();
|
||||
|
||||
case CommonPathCapabilities.FS_CHECKSUMS:
|
||||
// capability depends on FS configuration
|
||||
return getConf().getBoolean(ETAG_CHECKSUM_ENABLED,
|
||||
ETAG_CHECKSUM_ENABLED_DEFAULT);
|
||||
|
||||
default:
|
||||
return super.hasPathCapability(p, capability);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the capabilities of this filesystem instance.
|
||||
*
|
||||
* This has been supplanted by {@link #hasPathCapability(Path, String)}.
|
||||
* @param capability string to query the stream support for.
|
||||
* @return whether the FS instance has the capability.
|
||||
*/
|
||||
@Deprecated
|
||||
@Override
|
||||
public boolean hasCapability(String capability) {
|
||||
try {
|
||||
return hasPathCapability(workingDir, capability);
|
||||
} catch (IOException ex) {
|
||||
// should never happen, so log and downgrade.
|
||||
LOG.debug("Ignoring exception on hasCapability({}})", capability, ex);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,14 +78,32 @@ public final class CommitConstants {
|
|||
* Value: {@value}.
|
||||
*/
|
||||
public static final String STREAM_CAPABILITY_MAGIC_OUTPUT
|
||||
= "fs.s3a.capability.magic.output.stream";
|
||||
|
||||
/**
|
||||
* Flag to indicate that a store supports magic committers.
|
||||
* returned in {@code PathCapabilities}
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String STORE_CAPABILITY_MAGIC_COMMITTER
|
||||
= "fs.s3a.capability.magic.committer";
|
||||
|
||||
/**
|
||||
* Flag to indicate whether a stream is a magic output stream;
|
||||
* returned in {@code StreamCapabilities}
|
||||
* Value: {@value}.
|
||||
*/
|
||||
@Deprecated
|
||||
public static final String STREAM_CAPABILITY_MAGIC_OUTPUT_OLD
|
||||
= "s3a:magic.output.stream";
|
||||
|
||||
/**
|
||||
* Flag to indicate that a store supports magic committers.
|
||||
* returned in {@code StreamCapabilities}
|
||||
* returned in {@code PathCapabilities}
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String STORE_CAPABILITY_MAGIC_COMMITTER
|
||||
@Deprecated
|
||||
public static final String STORE_CAPABILITY_MAGIC_COMMITTER_OLD
|
||||
= "s3a:magic.committer";
|
||||
|
||||
/**
|
||||
|
|
|
@ -1227,7 +1227,8 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
|||
} else {
|
||||
println(out, "Filesystem %s is not using S3Guard", fsUri);
|
||||
}
|
||||
boolean magic = fs.hasCapability(
|
||||
boolean magic = fs.hasPathCapability(
|
||||
new Path(s3Path),
|
||||
CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER);
|
||||
println(out, "The \"magic\" committer %s supported",
|
||||
magic ? "is" : "is not");
|
||||
|
|
|
@ -50,7 +50,7 @@ public final class SelectConstants {
|
|||
* Does the FS Support S3 Select?
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String S3_SELECT_CAPABILITY = "s3a:fs.s3a.select.sql";
|
||||
public static final String S3_SELECT_CAPABILITY = "fs.s3a.capability.select.sql";
|
||||
|
||||
/**
|
||||
* Flag: is S3 select enabled?
|
||||
|
|
|
@ -234,7 +234,7 @@ public class SelectTool extends S3GuardTool {
|
|||
}
|
||||
setFilesystem((S3AFileSystem) fs);
|
||||
|
||||
if (!getFilesystem().hasCapability(S3_SELECT_CAPABILITY)) {
|
||||
if (!getFilesystem().hasPathCapability(path, S3_SELECT_CAPABILITY)) {
|
||||
// capability disabled
|
||||
throw new ExitUtil.ExitException(EXIT_SERVICE_UNAVAILABLE,
|
||||
SELECT_IS_DISABLED + " for " + file);
|
||||
|
|
|
@ -28,12 +28,15 @@ import com.amazonaws.services.s3.model.PutObjectRequest;
|
|||
import org.junit.Assume;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.CommonPathCapabilities;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.store.EtagChecksum;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasPathCapabilities;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksPathCapabilities;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||
|
||||
|
@ -142,6 +145,8 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
|
|||
Path file1 = touchFile("file1");
|
||||
EtagChecksum checksum1 = fs.getFileChecksum(file1, 0);
|
||||
LOG.info("Checksum for {}: {}", file1, checksum1);
|
||||
assertHasPathCapabilities(fs, file1,
|
||||
CommonPathCapabilities.FS_CHECKSUMS);
|
||||
assertNotNull("Null file 1 checksum", checksum1);
|
||||
assertNotEquals("file 1 checksum", 0, checksum1.getLength());
|
||||
assertEquals("checksums", checksum1,
|
||||
|
@ -159,6 +164,8 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
|
|||
final S3AFileSystem fs = getFileSystem();
|
||||
Path file1 = touchFile("file1");
|
||||
EtagChecksum checksum1 = fs.getFileChecksum(file1, 0);
|
||||
assertLacksPathCapabilities(fs, file1,
|
||||
CommonPathCapabilities.FS_CHECKSUMS);
|
||||
assertNull("Checksums are being generated", checksum1);
|
||||
}
|
||||
|
||||
|
|
|
@ -1236,9 +1236,12 @@ public final class S3ATestUtils {
|
|||
* Skip a test if the FS isn't marked as supporting magic commits.
|
||||
* @param fs filesystem
|
||||
*/
|
||||
public static void assumeMagicCommitEnabled(S3AFileSystem fs) {
|
||||
public static void assumeMagicCommitEnabled(S3AFileSystem fs)
|
||||
throws IOException {
|
||||
assume("Magic commit option disabled on " + fs,
|
||||
fs.hasCapability(CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER));
|
||||
fs.hasPathCapability(
|
||||
fs.getWorkingDirectory(),
|
||||
CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -550,10 +550,7 @@ public class ITestCommitOperations extends AbstractCommitITest {
|
|||
@Test
|
||||
public void testWriteNormalStream() throws Throwable {
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
Assume.assumeTrue(
|
||||
"Filesystem does not have magic support enabled: " + fs,
|
||||
fs.hasCapability(STORE_CAPABILITY_MAGIC_COMMITTER));
|
||||
|
||||
assumeMagicCommitEnabled(fs);
|
||||
Path destFile = path("normal");
|
||||
try (FSDataOutputStream out = fs.create(destFile, true)) {
|
||||
out.writeChars("data");
|
||||
|
|
|
@ -517,7 +517,7 @@ public abstract class AbstractS3GuardToolTestBase extends AbstractS3ATestBase {
|
|||
String name = fs.getUri().toString();
|
||||
S3GuardTool.BucketInfo cmd = new S3GuardTool.BucketInfo(
|
||||
getConfiguration());
|
||||
if (fs.hasCapability(
|
||||
if (fs.hasPathCapability(fs.getWorkingDirectory(),
|
||||
CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER)) {
|
||||
// if the FS is magic, expect this to work
|
||||
exec(cmd, S3GuardTool.BucketInfo.MAGIC_FLAG, name);
|
||||
|
|
|
@ -102,9 +102,9 @@ public class ITestS3Select extends AbstractS3SelectTest {
|
|||
@Override
|
||||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
Assume.assumeTrue("S3 Select is not enabled",
|
||||
getFileSystem().hasCapability(S3_SELECT_CAPABILITY));
|
||||
csvPath = path(getMethodName() + ".csv");
|
||||
Assume.assumeTrue("S3 Select is not enabled",
|
||||
getFileSystem().hasPathCapability(csvPath, S3_SELECT_CAPABILITY));
|
||||
selectConf = new Configuration(false);
|
||||
selectConf.setBoolean(SELECT_ERRORS_INCLUDE_SQL, true);
|
||||
createStandardCsvFile(getFileSystem(), csvPath, ALL_QUOTES);
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.commons.lang3.StringUtils;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonPathCapabilities;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.ContentSummary.Builder;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
|
@ -70,6 +71,7 @@ import org.apache.hadoop.util.ReflectionUtils;
|
|||
import org.apache.hadoop.util.VersionInfo;
|
||||
|
||||
import static org.apache.hadoop.fs.adl.AdlConfKeys.*;
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
|
||||
/**
|
||||
* A FileSystem to access Azure Data Lake Store.
|
||||
|
@ -1033,4 +1035,20 @@ public class AdlFileSystem extends FileSystem {
|
|||
}
|
||||
return dest;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPathCapability(final Path path, final String capability)
|
||||
throws IOException {
|
||||
|
||||
switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
|
||||
|
||||
case CommonPathCapabilities.FS_ACLS:
|
||||
case CommonPathCapabilities.FS_APPEND:
|
||||
case CommonPathCapabilities.FS_CONCAT:
|
||||
case CommonPathCapabilities.FS_PERMISSIONS:
|
||||
return true;
|
||||
default:
|
||||
return super.hasPathCapability(path, capability);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BufferedFSInputStream;
|
||||
import org.apache.hadoop.fs.CommonPathCapabilities;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
|
@ -84,6 +85,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.hadoop.fs.azure.NativeAzureFileSystemHelper.*;
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -3866,4 +3868,19 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
void updateDaemonUsers(List<String> daemonUsers) {
|
||||
this.daemonUsers = daemonUsers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPathCapability(final Path path, final String capability)
|
||||
throws IOException {
|
||||
switch (validatePathCapabilityArgs(path, capability)) {
|
||||
|
||||
case CommonPathCapabilities.FS_PERMISSIONS:
|
||||
return true;
|
||||
// Append support is dynamic
|
||||
case CommonPathCapabilities.FS_APPEND:
|
||||
return appendSupportEnabled;
|
||||
default:
|
||||
return super.hasPathCapability(path, capability);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
|
|||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.CommonPathCapabilities;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
|
@ -76,6 +77,8 @@ import org.apache.hadoop.security.token.Token;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
|
||||
/**
|
||||
* A {@link org.apache.hadoop.fs.FileSystem} for reading and writing files stored on <a
|
||||
* href="http://store.azure.com/">Windows Azure</a>
|
||||
|
@ -1129,4 +1132,20 @@ public class AzureBlobFileSystem extends FileSystem {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPathCapability(final Path path, final String capability)
|
||||
throws IOException {
|
||||
// qualify the path to make sure that it refers to the current FS.
|
||||
final Path p = makeQualified(path);
|
||||
switch (validatePathCapabilityArgs(p, capability)) {
|
||||
case CommonPathCapabilities.FS_PERMISSIONS:
|
||||
case CommonPathCapabilities.FS_APPEND:
|
||||
return true;
|
||||
case CommonPathCapabilities.FS_ACLS:
|
||||
return getIsNamespaceEnabled();
|
||||
default:
|
||||
return super.hasPathCapability(p, capability);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue