HADOOP-15661. ABFS: Add support for ACL.
Contributed by Junhua Gu and Da Zhou.
This commit is contained in:
parent
9149b9703e
commit
9c1e4e8139
|
@ -26,6 +26,7 @@ import java.io.OutputStream;
|
|||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -60,6 +61,8 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnh
|
|||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclStatus;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
@ -154,7 +157,8 @@ public class AzureBlobFileSystem extends FileSystem {
|
|||
blockSize);
|
||||
|
||||
try {
|
||||
OutputStream outputStream = abfsStore.createFile(makeQualified(f), overwrite);
|
||||
OutputStream outputStream = abfsStore.createFile(makeQualified(f), overwrite,
|
||||
permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf()));
|
||||
return new FSDataOutputStream(outputStream, statistics);
|
||||
} catch(AzureBlobFileSystemException ex) {
|
||||
checkException(f, ex);
|
||||
|
@ -253,7 +257,8 @@ public class AzureBlobFileSystem extends FileSystem {
|
|||
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
|
||||
AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
|
||||
AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
|
||||
AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND);
|
||||
AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND,
|
||||
AzureServiceErrorCode.INTERNAL_OPERATION_ABORT);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -308,7 +313,8 @@ public class AzureBlobFileSystem extends FileSystem {
|
|||
}
|
||||
|
||||
try {
|
||||
abfsStore.createDirectory(makeQualified(f));
|
||||
abfsStore.createDirectory(makeQualified(f), permission == null ? FsPermission.getDirDefault() : permission,
|
||||
FsPermission.getUMask(getConf()));
|
||||
return true;
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
checkException(f, ex, AzureServiceErrorCode.PATH_ALREADY_EXISTS);
|
||||
|
@ -457,6 +463,188 @@ public class AzureBlobFileSystem extends FileSystem {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set owner of a path (i.e. a file or a directory).
|
||||
* The parameters owner and group cannot both be null.
|
||||
*
|
||||
* @param path The path
|
||||
* @param owner If it is null, the original username remains unchanged.
|
||||
* @param group If it is null, the original groupname remains unchanged.
|
||||
*/
|
||||
@Override
|
||||
public void setOwner(final Path path, final String owner, final String group)
|
||||
throws IOException {
|
||||
LOG.debug(
|
||||
"AzureBlobFileSystem.setOwner path: {}", path);
|
||||
|
||||
if ((owner == null || owner.isEmpty()) && (group == null || group.isEmpty())) {
|
||||
throw new IllegalArgumentException("A valid owner or group must be specified.");
|
||||
}
|
||||
|
||||
try {
|
||||
abfsStore.setOwner(makeQualified(path),
|
||||
owner,
|
||||
group);
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
checkException(path, ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set permission of a path.
|
||||
*
|
||||
* @param path The path
|
||||
* @param permission Access permission
|
||||
*/
|
||||
@Override
|
||||
public void setPermission(final Path path, final FsPermission permission)
|
||||
throws IOException {
|
||||
LOG.debug("AzureBlobFileSystem.setPermission path: {}", path);
|
||||
|
||||
if (permission == null) {
|
||||
throw new IllegalArgumentException("The permission can't be null");
|
||||
}
|
||||
|
||||
try {
|
||||
abfsStore.setPermission(makeQualified(path),
|
||||
permission);
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
checkException(path, ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Modifies ACL entries of files and directories. This method can add new ACL
|
||||
* entries or modify the permissions on existing ACL entries. All existing
|
||||
* ACL entries that are not specified in this call are retained without
|
||||
* changes. (Modifications are merged into the current ACL.)
|
||||
*
|
||||
* @param path Path to modify
|
||||
* @param aclSpec List of AbfsAclEntry describing modifications
|
||||
* @throws IOException if an ACL could not be modified
|
||||
*/
|
||||
@Override
|
||||
public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec)
|
||||
throws IOException {
|
||||
LOG.debug("AzureBlobFileSystem.modifyAclEntries path: {}", path.toString());
|
||||
|
||||
if (aclSpec == null || aclSpec.isEmpty()) {
|
||||
throw new IllegalArgumentException("The value of the aclSpec parameter is invalid.");
|
||||
}
|
||||
|
||||
try {
|
||||
abfsStore.modifyAclEntries(makeQualified(path),
|
||||
aclSpec);
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
checkException(path, ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes ACL entries from files and directories. Other ACL entries are
|
||||
* retained.
|
||||
*
|
||||
* @param path Path to modify
|
||||
* @param aclSpec List of AclEntry describing entries to remove
|
||||
* @throws IOException if an ACL could not be modified
|
||||
*/
|
||||
@Override
|
||||
public void removeAclEntries(final Path path, final List<AclEntry> aclSpec)
|
||||
throws IOException {
|
||||
LOG.debug("AzureBlobFileSystem.removeAclEntries path: {}", path);
|
||||
|
||||
if (aclSpec == null || aclSpec.isEmpty()) {
|
||||
throw new IllegalArgumentException("The aclSpec argument is invalid.");
|
||||
}
|
||||
|
||||
try {
|
||||
abfsStore.removeAclEntries(makeQualified(path), aclSpec);
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
checkException(path, ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes all default ACL entries from files and directories.
|
||||
*
|
||||
* @param path Path to modify
|
||||
* @throws IOException if an ACL could not be modified
|
||||
*/
|
||||
@Override
|
||||
public void removeDefaultAcl(final Path path) throws IOException {
|
||||
LOG.debug("AzureBlobFileSystem.removeDefaultAcl path: {}", path);
|
||||
|
||||
try {
|
||||
abfsStore.removeDefaultAcl(makeQualified(path));
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
checkException(path, ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes all but the base ACL entries of files and directories. The entries
|
||||
* for user, group, and others are retained for compatibility with permission
|
||||
* bits.
|
||||
*
|
||||
* @param path Path to modify
|
||||
* @throws IOException if an ACL could not be removed
|
||||
*/
|
||||
@Override
|
||||
public void removeAcl(final Path path) throws IOException {
|
||||
LOG.debug("AzureBlobFileSystem.removeAcl path: {}", path);
|
||||
|
||||
try {
|
||||
abfsStore.removeAcl(makeQualified(path));
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
checkException(path, ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fully replaces ACL of files and directories, discarding all existing
|
||||
* entries.
|
||||
*
|
||||
* @param path Path to modify
|
||||
* @param aclSpec List of AclEntry describing modifications, must include
|
||||
* entries for user, group, and others for compatibility with
|
||||
* permission bits.
|
||||
* @throws IOException if an ACL could not be modified
|
||||
*/
|
||||
@Override
|
||||
public void setAcl(final Path path, final List<AclEntry> aclSpec)
|
||||
throws IOException {
|
||||
LOG.debug("AzureBlobFileSystem.setAcl path: {}", path);
|
||||
|
||||
if (aclSpec == null || aclSpec.size() == 0) {
|
||||
throw new IllegalArgumentException("The aclSpec argument is invalid.");
|
||||
}
|
||||
|
||||
try {
|
||||
abfsStore.setAcl(makeQualified(path), aclSpec);
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
checkException(path, ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the ACL of a file or directory.
|
||||
*
|
||||
* @param path Path to get
|
||||
* @return AbfsAclStatus describing the ACL of the file or directory
|
||||
* @throws IOException if an ACL could not be read
|
||||
*/
|
||||
@Override
|
||||
public AclStatus getAclStatus(final Path path) throws IOException {
|
||||
LOG.debug("AzureBlobFileSystem.getAclStatus path: {}", path.toString());
|
||||
|
||||
try {
|
||||
return abfsStore.getAclStatus(makeQualified(path));
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
checkException(path, ex);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private FileStatus tryGetFileStatus(final Path f) {
|
||||
try {
|
||||
return getFileStatus(f);
|
||||
|
@ -656,4 +844,9 @@ public class AzureBlobFileSystem extends FileSystem {
|
|||
AbfsClient getAbfsClient() {
|
||||
return abfsStore.getClient();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
boolean getIsNamespaceEnabeld() throws AzureBlobFileSystemException {
|
||||
return abfsStore.getIsNamespaceEnabled();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,8 +36,10 @@ import java.text.SimpleDateFormat;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Hashtable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import javax.xml.bind.DatatypeConverter;
|
||||
|
@ -68,13 +70,18 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
|
|||
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
||||
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
|
||||
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclStatus;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -85,7 +92,7 @@ import org.slf4j.LoggerFactory;
|
|||
import static org.apache.hadoop.util.Time.now;
|
||||
|
||||
/**
|
||||
* Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage
|
||||
* Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
|
@ -103,7 +110,8 @@ public class AzureBlobFileSystemStore {
|
|||
|
||||
private final AbfsConfiguration abfsConfiguration;
|
||||
private final Set<String> azureAtomicRenameDirSet;
|
||||
|
||||
private boolean isNamespaceEnabledSet;
|
||||
private boolean isNamespaceEnabled;
|
||||
|
||||
public AzureBlobFileSystemStore(URI uri, boolean isSecure, Configuration configuration, UserGroupInformation userGroupInformation)
|
||||
throws AzureBlobFileSystemException {
|
||||
|
@ -121,6 +129,20 @@ public class AzureBlobFileSystemStore {
|
|||
initializeClient(uri, isSecure);
|
||||
}
|
||||
|
||||
public boolean getIsNamespaceEnabled() throws AzureBlobFileSystemException {
|
||||
if (!isNamespaceEnabledSet) {
|
||||
LOG.debug("getFilesystemProperties for filesystem: {}",
|
||||
client.getFileSystem());
|
||||
|
||||
final AbfsRestOperation op = client.getFilesystemProperties();
|
||||
isNamespaceEnabled = Boolean.parseBoolean(
|
||||
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_NAMESPACE_ENABLED));
|
||||
isNamespaceEnabledSet = true;
|
||||
}
|
||||
|
||||
return isNamespaceEnabled;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
URIBuilder getURIBuilder(final String hostName, boolean isSecure) {
|
||||
String scheme = isSecure ? FileSystemUriSchemes.HTTPS_SCHEME : FileSystemUriSchemes.HTTP_SCHEME;
|
||||
|
@ -197,7 +219,7 @@ public class AzureBlobFileSystemStore {
|
|||
} catch (CharacterCodingException ex) {
|
||||
throw new InvalidAbfsRestOperationException(ex);
|
||||
}
|
||||
client.setPathProperties("/" + getRelativePath(path), commaSeparatedProperties);
|
||||
client.setPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), commaSeparatedProperties);
|
||||
}
|
||||
|
||||
public void createFilesystem() throws AzureBlobFileSystemException {
|
||||
|
@ -214,13 +236,20 @@ public class AzureBlobFileSystemStore {
|
|||
client.deleteFilesystem();
|
||||
}
|
||||
|
||||
public OutputStream createFile(final Path path, final boolean overwrite) throws AzureBlobFileSystemException {
|
||||
LOG.debug("createFile filesystem: {} path: {} overwrite: {}",
|
||||
public OutputStream createFile(final Path path, final boolean overwrite, final FsPermission permission,
|
||||
final FsPermission umask) throws AzureBlobFileSystemException {
|
||||
boolean isNamespaceEnabled = getIsNamespaceEnabled();
|
||||
LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
|
||||
client.getFileSystem(),
|
||||
path,
|
||||
overwrite);
|
||||
overwrite,
|
||||
permission.toString(),
|
||||
umask.toString(),
|
||||
isNamespaceEnabled);
|
||||
|
||||
client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite);
|
||||
client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite,
|
||||
isNamespaceEnabled ? getOctalNotation(permission) : null,
|
||||
isNamespaceEnabled ? getOctalNotation(umask) : null);
|
||||
|
||||
final OutputStream outputStream;
|
||||
outputStream = new FSDataOutputStream(
|
||||
|
@ -229,16 +258,23 @@ public class AzureBlobFileSystemStore {
|
|||
return outputStream;
|
||||
}
|
||||
|
||||
public void createDirectory(final Path path) throws AzureBlobFileSystemException {
|
||||
LOG.debug("createDirectory filesystem: {} path: {}",
|
||||
public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask)
|
||||
throws AzureBlobFileSystemException {
|
||||
boolean isNamespaceEnabled = getIsNamespaceEnabled();
|
||||
LOG.debug("createDirectory filesystem: {} path: {} permission: {} umask: {} isNamespaceEnabled: {}",
|
||||
client.getFileSystem(),
|
||||
path);
|
||||
path,
|
||||
permission,
|
||||
umask,
|
||||
isNamespaceEnabled);
|
||||
|
||||
client.createPath("/" + getRelativePath(path), false, true);
|
||||
client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), false, true,
|
||||
isNamespaceEnabled ? getOctalNotation(permission) : null,
|
||||
isNamespaceEnabled ? getOctalNotation(umask) : null);
|
||||
}
|
||||
|
||||
public InputStream openFileForRead(final Path path, final FileSystem.Statistics statistics) throws AzureBlobFileSystemException {
|
||||
|
||||
public InputStream openFileForRead(final Path path, final FileSystem.Statistics statistics)
|
||||
throws AzureBlobFileSystemException {
|
||||
LOG.debug("openFileForRead filesystem: {} path: {}",
|
||||
client.getFileSystem(),
|
||||
path);
|
||||
|
@ -327,7 +363,6 @@ public class AzureBlobFileSystemStore {
|
|||
|
||||
public void delete(final Path path, final boolean recursive)
|
||||
throws AzureBlobFileSystemException {
|
||||
|
||||
LOG.debug("delete filesystem: {} path: {} recursive: {}",
|
||||
client.getFileSystem(),
|
||||
path,
|
||||
|
@ -351,19 +386,31 @@ public class AzureBlobFileSystemStore {
|
|||
}
|
||||
|
||||
public FileStatus getFileStatus(final Path path) throws IOException {
|
||||
|
||||
LOG.debug("getFileStatus filesystem: {} path: {}",
|
||||
boolean isNamespaceEnabled = getIsNamespaceEnabled();
|
||||
LOG.debug("getFileStatus filesystem: {} path: {} isNamespaceEnabled: {}",
|
||||
client.getFileSystem(),
|
||||
path);
|
||||
path,
|
||||
isNamespaceEnabled);
|
||||
|
||||
if (path.isRoot()) {
|
||||
AbfsRestOperation op = client.getFilesystemProperties();
|
||||
final AbfsRestOperation op = isNamespaceEnabled
|
||||
? client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH)
|
||||
: client.getFilesystemProperties();
|
||||
|
||||
final long blockSize = abfsConfiguration.getAzureBlockSize();
|
||||
final String owner = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
|
||||
final String group = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
|
||||
final String permissions = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
|
||||
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
|
||||
final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
|
||||
final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
|
||||
|
||||
return new VersionedFileStatus(
|
||||
userGroupInformation.getUserName(),
|
||||
userGroupInformation.getPrimaryGroupName(),
|
||||
owner == null ? userGroupInformation.getUserName() : owner,
|
||||
group == null ? userGroupInformation.getPrimaryGroupName() : group,
|
||||
permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
|
||||
: AbfsPermission.valueOf(permissions),
|
||||
hasAcl,
|
||||
0,
|
||||
true,
|
||||
1,
|
||||
|
@ -375,14 +422,22 @@ public class AzureBlobFileSystemStore {
|
|||
AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
|
||||
|
||||
final long blockSize = abfsConfiguration.getAzureBlockSize();
|
||||
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
|
||||
final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
|
||||
final String contentLength = op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH);
|
||||
final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
|
||||
final AbfsHttpOperation result = op.getResult();
|
||||
final String eTag = result.getResponseHeader(HttpHeaderConfigurations.ETAG);
|
||||
final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
|
||||
final String contentLength = result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH);
|
||||
final String resourceType = result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
|
||||
final String owner = result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
|
||||
final String group = result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
|
||||
final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS));
|
||||
final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
|
||||
|
||||
return new VersionedFileStatus(
|
||||
userGroupInformation.getUserName(),
|
||||
userGroupInformation.getPrimaryGroupName(),
|
||||
owner == null ? userGroupInformation.getUserName() : owner,
|
||||
group == null ? userGroupInformation.getPrimaryGroupName() : group,
|
||||
permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
|
||||
: AbfsPermission.valueOf(permissions),
|
||||
hasAcl,
|
||||
parseContentLength(contentLength),
|
||||
parseIsDirectory(resourceType),
|
||||
1,
|
||||
|
@ -417,6 +472,13 @@ public class AzureBlobFileSystemStore {
|
|||
long blockSize = abfsConfiguration.getAzureBlockSize();
|
||||
|
||||
for (ListResultEntrySchema entry : retrievedSchema.paths()) {
|
||||
final String owner = entry.owner() == null ? userGroupInformation.getUserName() : entry.owner();
|
||||
final String group = entry.group() == null ? userGroupInformation.getPrimaryGroupName() : entry.group();
|
||||
final FsPermission fsPermission = entry.permissions() == null
|
||||
? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
|
||||
: AbfsPermission.valueOf(entry.permissions());
|
||||
final boolean hasAcl = AbfsPermission.isExtendedAcl(entry.permissions());
|
||||
|
||||
long lastModifiedMillis = 0;
|
||||
long contentLength = entry.contentLength() == null ? 0 : entry.contentLength();
|
||||
boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory();
|
||||
|
@ -429,8 +491,10 @@ public class AzureBlobFileSystemStore {
|
|||
|
||||
fileStatuses.add(
|
||||
new VersionedFileStatus(
|
||||
userGroupInformation.getUserName(),
|
||||
userGroupInformation.getPrimaryGroupName(),
|
||||
owner,
|
||||
group,
|
||||
fsPermission,
|
||||
hasAcl,
|
||||
contentLength,
|
||||
isDirectory,
|
||||
1,
|
||||
|
@ -445,6 +509,211 @@ public class AzureBlobFileSystemStore {
|
|||
return fileStatuses.toArray(new FileStatus[0]);
|
||||
}
|
||||
|
||||
public void setOwner(final Path path, final String owner, final String group) throws
|
||||
AzureBlobFileSystemException {
|
||||
if (!getIsNamespaceEnabled()) {
|
||||
throw new UnsupportedOperationException(
|
||||
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
|
||||
}
|
||||
|
||||
LOG.debug(
|
||||
"setOwner filesystem: {} path: {} owner: {} group: {}",
|
||||
client.getFileSystem(),
|
||||
path.toString(),
|
||||
owner,
|
||||
group);
|
||||
client.setOwner(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), owner, group);
|
||||
}
|
||||
|
||||
public void setPermission(final Path path, final FsPermission permission) throws
|
||||
AzureBlobFileSystemException {
|
||||
if (!getIsNamespaceEnabled()) {
|
||||
throw new UnsupportedOperationException(
|
||||
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
|
||||
}
|
||||
|
||||
LOG.debug(
|
||||
"setPermission filesystem: {} path: {} permission: {}",
|
||||
client.getFileSystem(),
|
||||
path.toString(),
|
||||
permission.toString());
|
||||
client.setPermission(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
|
||||
String.format(AbfsHttpConstants.PERMISSION_FORMAT, permission.toOctal()));
|
||||
}
|
||||
|
||||
public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec) throws
|
||||
AzureBlobFileSystemException {
|
||||
if (!getIsNamespaceEnabled()) {
|
||||
throw new UnsupportedOperationException(
|
||||
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
|
||||
}
|
||||
|
||||
LOG.debug(
|
||||
"modifyAclEntries filesystem: {} path: {} aclSpec: {}",
|
||||
client.getFileSystem(),
|
||||
path.toString(),
|
||||
AclEntry.aclSpecToString(aclSpec));
|
||||
|
||||
final Map<String, String> modifyAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
|
||||
|
||||
final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
|
||||
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
|
||||
|
||||
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
|
||||
|
||||
for (Map.Entry<String, String> modifyAclEntry : modifyAclEntries.entrySet()) {
|
||||
aclEntries.put(modifyAclEntry.getKey(), modifyAclEntry.getValue());
|
||||
}
|
||||
|
||||
if (!modifyAclEntries.containsKey(AbfsHttpConstants.ACCESS_MASK)) {
|
||||
aclEntries.remove(AbfsHttpConstants.ACCESS_MASK);
|
||||
}
|
||||
|
||||
if (!modifyAclEntries.containsKey(AbfsHttpConstants.DEFAULT_MASK)) {
|
||||
aclEntries.remove(AbfsHttpConstants.DEFAULT_MASK);
|
||||
}
|
||||
|
||||
client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
|
||||
AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
|
||||
}
|
||||
|
||||
public void removeAclEntries(final Path path, final List<AclEntry> aclSpec) throws AzureBlobFileSystemException {
|
||||
if (!getIsNamespaceEnabled()) {
|
||||
throw new UnsupportedOperationException(
|
||||
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
|
||||
}
|
||||
|
||||
LOG.debug(
|
||||
"removeAclEntries filesystem: {} path: {} aclSpec: {}",
|
||||
client.getFileSystem(),
|
||||
path.toString(),
|
||||
AclEntry.aclSpecToString(aclSpec));
|
||||
|
||||
final Map<String, String> removeAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
|
||||
final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
|
||||
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
|
||||
|
||||
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
|
||||
|
||||
AbfsAclHelper.removeAclEntriesInternal(aclEntries, removeAclEntries);
|
||||
|
||||
client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
|
||||
AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
|
||||
}
|
||||
|
||||
public void removeDefaultAcl(final Path path) throws AzureBlobFileSystemException {
|
||||
if (!getIsNamespaceEnabled()) {
|
||||
throw new UnsupportedOperationException(
|
||||
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
|
||||
}
|
||||
|
||||
LOG.debug(
|
||||
"removeDefaultAcl filesystem: {} path: {}",
|
||||
client.getFileSystem(),
|
||||
path.toString());
|
||||
|
||||
final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
|
||||
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
|
||||
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
|
||||
final Map<String, String> defaultAclEntries = new HashMap<>();
|
||||
|
||||
for (Map.Entry<String, String> aclEntry : aclEntries.entrySet()) {
|
||||
if (aclEntry.getKey().startsWith("default:")) {
|
||||
defaultAclEntries.put(aclEntry.getKey(), aclEntry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
for (Map.Entry<String, String> defaultAclEntry : defaultAclEntries.entrySet()) {
|
||||
aclEntries.remove(defaultAclEntry.getKey());
|
||||
}
|
||||
|
||||
client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
|
||||
AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
|
||||
}
|
||||
|
||||
public void removeAcl(final Path path) throws AzureBlobFileSystemException {
|
||||
if (!getIsNamespaceEnabled()) {
|
||||
throw new UnsupportedOperationException(
|
||||
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
|
||||
}
|
||||
|
||||
LOG.debug(
|
||||
"removeAcl filesystem: {} path: {}",
|
||||
client.getFileSystem(),
|
||||
path.toString());
|
||||
final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
|
||||
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
|
||||
|
||||
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
|
||||
final Map<String, String> newAclEntries = new HashMap<>();
|
||||
|
||||
newAclEntries.put(AbfsHttpConstants.ACCESS_USER, aclEntries.get(AbfsHttpConstants.ACCESS_USER));
|
||||
newAclEntries.put(AbfsHttpConstants.ACCESS_GROUP, aclEntries.get(AbfsHttpConstants.ACCESS_GROUP));
|
||||
newAclEntries.put(AbfsHttpConstants.ACCESS_OTHER, aclEntries.get(AbfsHttpConstants.ACCESS_OTHER));
|
||||
|
||||
client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
|
||||
AbfsAclHelper.serializeAclSpec(newAclEntries), eTag);
|
||||
}
|
||||
|
||||
public void setAcl(final Path path, final List<AclEntry> aclSpec) throws AzureBlobFileSystemException {
|
||||
if (!getIsNamespaceEnabled()) {
|
||||
throw new UnsupportedOperationException(
|
||||
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
|
||||
}
|
||||
|
||||
LOG.debug(
|
||||
"setAcl filesystem: {} path: {} aclspec: {}",
|
||||
client.getFileSystem(),
|
||||
path.toString(),
|
||||
AclEntry.aclSpecToString(aclSpec));
|
||||
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
|
||||
final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
|
||||
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
|
||||
|
||||
final Map<String, String> getAclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
|
||||
for (Map.Entry<String, String> ace : getAclEntries.entrySet()) {
|
||||
if (ace.getKey().startsWith("default:") && (ace.getKey() != AbfsHttpConstants.DEFAULT_MASK)
|
||||
&& !aclEntries.containsKey(ace.getKey())) {
|
||||
aclEntries.put(ace.getKey(), ace.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
|
||||
AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
|
||||
}
|
||||
|
||||
public AclStatus getAclStatus(final Path path) throws IOException {
|
||||
if (!getIsNamespaceEnabled()) {
|
||||
throw new UnsupportedOperationException(
|
||||
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
|
||||
}
|
||||
|
||||
LOG.debug(
|
||||
"getAclStatus filesystem: {} path: {}",
|
||||
client.getFileSystem(),
|
||||
path.toString());
|
||||
AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
|
||||
AbfsHttpOperation result = op.getResult();
|
||||
|
||||
final String owner = result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
|
||||
final String group = result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
|
||||
final String permissions = result.getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
|
||||
final String aclSpecString = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL);
|
||||
|
||||
final List<AclEntry> processedAclEntries = AclEntry.parseAclSpec(AbfsAclHelper.processAclString(aclSpecString), true);
|
||||
final FsPermission fsPermission = permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
|
||||
: AbfsPermission.valueOf(permissions);
|
||||
|
||||
final AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
|
||||
aclStatusBuilder.owner(owner == null ? userGroupInformation.getUserName() : owner);
|
||||
aclStatusBuilder.group(group == null ? userGroupInformation.getPrimaryGroupName() : group);
|
||||
|
||||
aclStatusBuilder.setPermission(fsPermission);
|
||||
aclStatusBuilder.stickyBit(fsPermission.getStickyBit());
|
||||
aclStatusBuilder.addEntries(processedAclEntries);
|
||||
return aclStatusBuilder.build();
|
||||
}
|
||||
|
||||
public boolean isAtomicRenameKey(String key) {
|
||||
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
|
||||
}
|
||||
|
@ -507,19 +776,24 @@ public class AzureBlobFileSystemStore {
|
|||
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(), tokenProvider);
|
||||
}
|
||||
|
||||
private String getOctalNotation(FsPermission fsPermission) {
|
||||
Preconditions.checkNotNull(fsPermission, "fsPermission");
|
||||
return String.format(AbfsHttpConstants.PERMISSION_FORMAT, fsPermission.toOctal());
|
||||
}
|
||||
|
||||
private String getRelativePath(final Path path) {
|
||||
return getRelativePath(path, false);
|
||||
}
|
||||
|
||||
private String getRelativePath(final Path path, final boolean allowRootPath) {
|
||||
Preconditions.checkNotNull(path, "path");
|
||||
final String relativePath = path.toUri().getPath();
|
||||
|
||||
if (relativePath.isEmpty()) {
|
||||
return relativePath;
|
||||
if (relativePath.length() == 0 || (relativePath.length() == 1 && relativePath.charAt(0) == Path.SEPARATOR_CHAR)) {
|
||||
return allowRootPath ? AbfsHttpConstants.ROOT_PATH : AbfsHttpConstants.EMPTY_STRING;
|
||||
}
|
||||
|
||||
if (relativePath.charAt(0) == Path.SEPARATOR_CHAR) {
|
||||
if (relativePath.length() == 1) {
|
||||
return AbfsHttpConstants.EMPTY_STRING;
|
||||
}
|
||||
|
||||
return relativePath.substring(1);
|
||||
}
|
||||
|
||||
|
@ -644,15 +918,17 @@ public class AzureBlobFileSystemStore {
|
|||
private final String version;
|
||||
|
||||
VersionedFileStatus(
|
||||
final String owner, final String group,
|
||||
final String owner, final String group, final FsPermission fsPermission, final boolean hasAcl,
|
||||
final long length, final boolean isdir, final int blockReplication,
|
||||
final long blocksize, final long modificationTime, final Path path,
|
||||
String version) {
|
||||
super(length, isdir, blockReplication, blocksize, modificationTime, 0,
|
||||
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL),
|
||||
fsPermission,
|
||||
owner,
|
||||
group,
|
||||
path);
|
||||
null,
|
||||
path,
|
||||
hasAcl, false, false);
|
||||
|
||||
this.version = version;
|
||||
}
|
||||
|
@ -717,5 +993,4 @@ public class AzureBlobFileSystemStore {
|
|||
AbfsClient getClient() {
|
||||
return this.client;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -34,6 +34,8 @@ public final class AbfsHttpConstants {
|
|||
public static final String APPEND_ACTION = "append";
|
||||
public static final String FLUSH_ACTION = "flush";
|
||||
public static final String SET_PROPERTIES_ACTION = "setProperties";
|
||||
public static final String SET_ACCESS_CONTROL = "setAccessControl";
|
||||
public static final String GET_ACCESS_CONTROL = "getAccessControl";
|
||||
public static final String DEFAULT_TIMEOUT = "90";
|
||||
|
||||
public static final String JAVA_VERSION = "java.version";
|
||||
|
@ -58,6 +60,7 @@ public final class AbfsHttpConstants {
|
|||
public static final String PLUS = "+";
|
||||
public static final String STAR = "*";
|
||||
public static final String COMMA = ",";
|
||||
public static final String COLON = ":";
|
||||
public static final String EQUAL = "=";
|
||||
public static final String QUESTION_MARK = "?";
|
||||
public static final String AND_MARK = "&";
|
||||
|
@ -72,5 +75,17 @@ public final class AbfsHttpConstants {
|
|||
public static final String APPLICATION_JSON = "application/json";
|
||||
public static final String APPLICATION_OCTET_STREAM = "application/octet-stream";
|
||||
|
||||
public static final String ROOT_PATH = "/";
|
||||
public static final String ACCESS_MASK = "mask:";
|
||||
public static final String ACCESS_USER = "user:";
|
||||
public static final String ACCESS_GROUP = "group:";
|
||||
public static final String ACCESS_OTHER = "other:";
|
||||
public static final String DEFAULT_MASK = "default:mask:";
|
||||
public static final String DEFAULT_USER = "default:user:";
|
||||
public static final String DEFAULT_GROUP = "default:group:";
|
||||
public static final String DEFAULT_OTHER = "default:other:";
|
||||
public static final String DEFAULT_SCOPE = "default:";
|
||||
public static final String PERMISSION_FORMAT = "%04d";
|
||||
|
||||
private AbfsHttpConstants() {}
|
||||
}
|
||||
|
|
|
@ -52,6 +52,12 @@ public final class HttpHeaderConfigurations {
|
|||
public static final String X_MS_PROPERTIES = "x-ms-properties";
|
||||
public static final String X_MS_RENAME_SOURCE = "x-ms-rename-source";
|
||||
public static final String LAST_MODIFIED = "Last-Modified";
|
||||
public static final String X_MS_OWNER = "x-ms-owner";
|
||||
public static final String X_MS_GROUP = "x-ms-group";
|
||||
public static final String X_MS_ACL = "x-ms-acl";
|
||||
public static final String X_MS_PERMISSIONS = "x-ms-permissions";
|
||||
public static final String X_MS_UMASK = "x-ms-umask";
|
||||
public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
|
||||
|
||||
private HttpHeaderConfigurations() {}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Thrown when there is an attempt to perform an invalid operation on an ACL.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public final class InvalidAclOperationException extends AzureBlobFileSystemException {
|
||||
public InvalidAclOperationException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
public enum AzureServiceErrorCode {
|
||||
FILE_SYSTEM_ALREADY_EXISTS("FilesystemAlreadyExists", HttpURLConnection.HTTP_CONFLICT, null),
|
||||
PATH_ALREADY_EXISTS("PathAlreadyExists", HttpURLConnection.HTTP_CONFLICT, null),
|
||||
INTERNAL_OPERATION_ABORT("InternalOperationAbortError", HttpURLConnection.HTTP_CONFLICT, null),
|
||||
PATH_CONFLICT("PathConflict", HttpURLConnection.HTTP_CONFLICT, null),
|
||||
FILE_SYSTEM_NOT_FOUND("FilesystemNotFound", HttpURLConnection.HTTP_NOT_FOUND, null),
|
||||
PATH_NOT_FOUND("PathNotFound", HttpURLConnection.HTTP_NOT_FOUND, null),
|
||||
|
|
|
@ -0,0 +1,202 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAclOperationException;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
|
||||
/**
|
||||
* AbfsAclHelper provides convenience methods to implement modifyAclEntries / removeAclEntries / removeAcl / removeDefaultAcl
|
||||
* from setAcl and getAcl.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public final class AbfsAclHelper {
|
||||
|
||||
private AbfsAclHelper() {
|
||||
// not called
|
||||
}
|
||||
|
||||
public static Map<String, String> deserializeAclSpec(final String aclSpecString) {
|
||||
final Map<String, String> aclEntries = new HashMap<>();
|
||||
final String[] aclArray = aclSpecString.split(AbfsHttpConstants.COMMA);
|
||||
for (String acl : aclArray) {
|
||||
int idx = acl.lastIndexOf(AbfsHttpConstants.COLON);
|
||||
aclEntries.put(acl.substring(0, idx), acl.substring(idx + 1));
|
||||
}
|
||||
return aclEntries;
|
||||
}
|
||||
|
||||
public static String serializeAclSpec(final Map<String, String> aclEntries) {
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
for (Map.Entry<String, String> aclEntry : aclEntries.entrySet()) {
|
||||
sb.append(aclEntry.getKey() + AbfsHttpConstants.COLON + aclEntry.getValue() + AbfsHttpConstants.COMMA);
|
||||
}
|
||||
if (sb.length() > 0) {
|
||||
sb.setLength(sb.length() - 1);
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public static String processAclString(final String aclSpecString) {
|
||||
final List<String> aclEntries = Arrays.asList(aclSpecString.split(AbfsHttpConstants.COMMA));
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
|
||||
boolean containsMask = false;
|
||||
for (int i = aclEntries.size() - 1; i >= 0; i--) {
|
||||
String ace = aclEntries.get(i);
|
||||
if (ace.startsWith(AbfsHttpConstants.ACCESS_OTHER)|| ace.startsWith(AbfsHttpConstants.ACCESS_USER + AbfsHttpConstants.COLON)) {
|
||||
// skip
|
||||
} else if (ace.startsWith(AbfsHttpConstants.ACCESS_MASK)) {
|
||||
containsMask = true;
|
||||
// skip
|
||||
} else if (ace.startsWith(AbfsHttpConstants.ACCESS_GROUP + AbfsHttpConstants.COLON) && !containsMask) {
|
||||
// skip
|
||||
} else {
|
||||
sb.insert(0, ace + AbfsHttpConstants.COMMA);
|
||||
}
|
||||
}
|
||||
|
||||
return sb.length() == 0 ? AbfsHttpConstants.EMPTY_STRING : sb.substring(0, sb.length() - 1);
|
||||
}
|
||||
|
||||
public static void removeAclEntriesInternal(Map<String, String> aclEntries, Map<String, String> toRemoveEntries)
|
||||
throws AzureBlobFileSystemException {
|
||||
boolean accessAclTouched = false;
|
||||
boolean defaultAclTouched = false;
|
||||
|
||||
final Set<String> removeIndicationSet = new HashSet<>();
|
||||
|
||||
for (String entryKey : toRemoveEntries.keySet()) {
|
||||
final boolean isDefaultAcl = isDefaultAce(entryKey);
|
||||
if (removeNamedAceAndUpdateSet(entryKey, isDefaultAcl, removeIndicationSet, aclEntries)) {
|
||||
if (isDefaultAcl) {
|
||||
defaultAclTouched = true;
|
||||
} else {
|
||||
accessAclTouched = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (accessAclTouched) {
|
||||
if (removeIndicationSet.contains(AbfsHttpConstants.ACCESS_MASK)) {
|
||||
aclEntries.remove(AbfsHttpConstants.ACCESS_MASK);
|
||||
}
|
||||
recalculateMask(aclEntries, false);
|
||||
}
|
||||
if (defaultAclTouched) {
|
||||
if (removeIndicationSet.contains(AbfsHttpConstants.DEFAULT_MASK)) {
|
||||
aclEntries.remove(AbfsHttpConstants.DEFAULT_MASK);
|
||||
}
|
||||
if (removeIndicationSet.contains(AbfsHttpConstants.DEFAULT_USER)) {
|
||||
aclEntries.put(AbfsHttpConstants.DEFAULT_USER, aclEntries.get(AbfsHttpConstants.ACCESS_USER));
|
||||
}
|
||||
if (removeIndicationSet.contains(AbfsHttpConstants.DEFAULT_GROUP)) {
|
||||
aclEntries.put(AbfsHttpConstants.DEFAULT_GROUP, aclEntries.get(AbfsHttpConstants.ACCESS_GROUP));
|
||||
}
|
||||
if (removeIndicationSet.contains(AbfsHttpConstants.DEFAULT_OTHER)) {
|
||||
aclEntries.put(AbfsHttpConstants.DEFAULT_OTHER, aclEntries.get(AbfsHttpConstants.ACCESS_OTHER));
|
||||
}
|
||||
recalculateMask(aclEntries, true);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean removeNamedAceAndUpdateSet(String entry, boolean isDefaultAcl, Set<String> removeIndicationSet,
|
||||
Map<String, String> aclEntries)
|
||||
throws AzureBlobFileSystemException {
|
||||
final int startIndex = isDefaultAcl ? 1 : 0;
|
||||
final String[] entryParts = entry.split(AbfsHttpConstants.COLON);
|
||||
final String tag = isDefaultAcl ? AbfsHttpConstants.DEFAULT_SCOPE + entryParts[startIndex] + AbfsHttpConstants.COLON
|
||||
: entryParts[startIndex] + AbfsHttpConstants.COLON;
|
||||
|
||||
if ((entry.equals(AbfsHttpConstants.ACCESS_USER) || entry.equals(AbfsHttpConstants.ACCESS_GROUP)
|
||||
|| entry.equals(AbfsHttpConstants.ACCESS_OTHER))
|
||||
&& !isNamedAce(entry)) {
|
||||
throw new InvalidAclOperationException("Cannot remove user, group or other entry from access ACL.");
|
||||
}
|
||||
|
||||
boolean touched = false;
|
||||
if (!isNamedAce(entry)) {
|
||||
removeIndicationSet.add(tag); // this must not be a access user, group or other
|
||||
touched = true;
|
||||
} else {
|
||||
if (aclEntries.remove(entry) != null) {
|
||||
touched = true;
|
||||
}
|
||||
}
|
||||
return touched;
|
||||
}
|
||||
|
||||
private static void recalculateMask(Map<String, String> aclEntries, boolean isDefaultMask) {
|
||||
FsAction umask = FsAction.NONE;
|
||||
if (!isExtendAcl(aclEntries, isDefaultMask)) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (Map.Entry<String, String> aclEntry : aclEntries.entrySet()) {
|
||||
if (isDefaultMask) {
|
||||
if ((isDefaultAce(aclEntry.getKey()) && isNamedAce(aclEntry.getKey()))
|
||||
|| aclEntry.getKey().equals(AbfsHttpConstants.DEFAULT_GROUP)) {
|
||||
umask = umask.or(FsAction.getFsAction(aclEntry.getValue()));
|
||||
}
|
||||
} else {
|
||||
if ((!isDefaultAce(aclEntry.getKey()) && isNamedAce(aclEntry.getKey()))
|
||||
|| aclEntry.getKey().equals(AbfsHttpConstants.ACCESS_GROUP)) {
|
||||
umask = umask.or(FsAction.getFsAction(aclEntry.getValue()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
aclEntries.put(isDefaultMask ? AbfsHttpConstants.DEFAULT_MASK : AbfsHttpConstants.ACCESS_MASK, umask.SYMBOL);
|
||||
}
|
||||
|
||||
private static boolean isExtendAcl(Map<String, String> aclEntries, boolean checkDefault) {
|
||||
for (String entryKey : aclEntries.keySet()) {
|
||||
if (checkDefault && !(entryKey.equals(AbfsHttpConstants.DEFAULT_USER)
|
||||
|| entryKey.equals(AbfsHttpConstants.DEFAULT_GROUP)
|
||||
|| entryKey.equals(AbfsHttpConstants.DEFAULT_OTHER) || !isDefaultAce(entryKey))) {
|
||||
return true;
|
||||
}
|
||||
if (!checkDefault && !(entryKey.equals(AbfsHttpConstants.ACCESS_USER)
|
||||
|| entryKey.equals(AbfsHttpConstants.ACCESS_GROUP)
|
||||
|| entryKey.equals(AbfsHttpConstants.ACCESS_OTHER) || isDefaultAce(entryKey))) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static boolean isDefaultAce(String entry) {
|
||||
return entry.startsWith(AbfsHttpConstants.DEFAULT_SCOPE);
|
||||
}
|
||||
|
||||
private static boolean isNamedAce(String entry) {
|
||||
return entry.charAt(entry.length() - 1) != AbfsHttpConstants.COLON.charAt(0);
|
||||
}
|
||||
}
|
|
@ -29,6 +29,9 @@ import java.util.Locale;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -159,7 +162,8 @@ public class AbfsClient {
|
|||
|
||||
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
|
||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
|
||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, relativePath == null ? "" : relativePath);
|
||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, relativePath == null ? AbfsHttpConstants.EMPTY_STRING
|
||||
: relativePath);
|
||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
|
||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
|
||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults));
|
||||
|
@ -206,11 +210,19 @@ public class AbfsClient {
|
|||
return op;
|
||||
}
|
||||
|
||||
public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite)
|
||||
throws AzureBlobFileSystemException {
|
||||
public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite,
|
||||
final String permission, final String umask) throws AzureBlobFileSystemException {
|
||||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
||||
if (!overwrite) {
|
||||
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, "*"));
|
||||
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
|
||||
}
|
||||
|
||||
if (permission != null && !permission.isEmpty()) {
|
||||
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS, permission));
|
||||
}
|
||||
|
||||
if (umask != null && !umask.isEmpty()) {
|
||||
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK, umask));
|
||||
}
|
||||
|
||||
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
|
||||
|
@ -269,7 +281,6 @@ public class AbfsClient {
|
|||
return op;
|
||||
}
|
||||
|
||||
|
||||
public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData)
|
||||
throws AzureBlobFileSystemException {
|
||||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
||||
|
@ -373,6 +384,104 @@ public class AbfsClient {
|
|||
return op;
|
||||
}
|
||||
|
||||
public AbfsRestOperation setOwner(final String path, final String owner, final String group)
|
||||
throws AzureBlobFileSystemException {
|
||||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
||||
// JDK7 does not support PATCH, so to workaround the issue we will use
|
||||
// PUT and specify the real method in the X-Http-Method-Override header.
|
||||
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
|
||||
HTTP_METHOD_PATCH));
|
||||
|
||||
if (owner != null && !owner.isEmpty()) {
|
||||
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_OWNER, owner));
|
||||
}
|
||||
if (group != null && !group.isEmpty()) {
|
||||
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_GROUP, group));
|
||||
}
|
||||
|
||||
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
|
||||
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
|
||||
|
||||
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
|
||||
final AbfsRestOperation op = new AbfsRestOperation(
|
||||
this,
|
||||
AbfsHttpConstants.HTTP_METHOD_PUT,
|
||||
url,
|
||||
requestHeaders);
|
||||
op.execute();
|
||||
return op;
|
||||
}
|
||||
|
||||
public AbfsRestOperation setPermission(final String path, final String permission)
|
||||
throws AzureBlobFileSystemException {
|
||||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
||||
// JDK7 does not support PATCH, so to workaround the issue we will use
|
||||
// PUT and specify the real method in the X-Http-Method-Override header.
|
||||
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
|
||||
HTTP_METHOD_PATCH));
|
||||
|
||||
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS, permission));
|
||||
|
||||
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
|
||||
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
|
||||
|
||||
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
|
||||
final AbfsRestOperation op = new AbfsRestOperation(
|
||||
this,
|
||||
AbfsHttpConstants.HTTP_METHOD_PUT,
|
||||
url,
|
||||
requestHeaders);
|
||||
op.execute();
|
||||
return op;
|
||||
}
|
||||
|
||||
public AbfsRestOperation setAcl(final String path, final String aclSpecString) throws AzureBlobFileSystemException {
|
||||
return setAcl(path, aclSpecString, AbfsHttpConstants.EMPTY_STRING);
|
||||
}
|
||||
|
||||
public AbfsRestOperation setAcl(final String path, final String aclSpecString, final String eTag)
|
||||
throws AzureBlobFileSystemException {
|
||||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
||||
// JDK7 does not support PATCH, so to workaround the issue we will use
|
||||
// PUT and specify the real method in the X-Http-Method-Override header.
|
||||
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
|
||||
HTTP_METHOD_PATCH));
|
||||
|
||||
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_ACL, aclSpecString));
|
||||
|
||||
if (eTag != null && !eTag.isEmpty()) {
|
||||
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
|
||||
}
|
||||
|
||||
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
|
||||
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
|
||||
|
||||
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
|
||||
final AbfsRestOperation op = new AbfsRestOperation(
|
||||
this,
|
||||
AbfsHttpConstants.HTTP_METHOD_PUT,
|
||||
url,
|
||||
requestHeaders);
|
||||
op.execute();
|
||||
return op;
|
||||
}
|
||||
|
||||
public AbfsRestOperation getAclStatus(final String path) throws AzureBlobFileSystemException {
|
||||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
||||
|
||||
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
|
||||
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_ACCESS_CONTROL);
|
||||
|
||||
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
|
||||
final AbfsRestOperation op = new AbfsRestOperation(
|
||||
this,
|
||||
AbfsHttpConstants.HTTP_METHOD_HEAD,
|
||||
url,
|
||||
requestHeaders);
|
||||
op.execute();
|
||||
return op;
|
||||
}
|
||||
|
||||
private URL createRequestUrl(final String query) throws AzureBlobFileSystemException {
|
||||
return createRequestUrl(EMPTY_STRING, query);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,114 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
||||
/**
|
||||
* The AbfsPermission for AbfsClient.
|
||||
*/
|
||||
public class AbfsPermission extends FsPermission {
|
||||
private static final int STICKY_BIT_OCTAL_VALUE = 01000;
|
||||
private final boolean aclBit;
|
||||
|
||||
public AbfsPermission(Short aShort, boolean aclBitStatus) {
|
||||
super(aShort);
|
||||
this.aclBit = aclBitStatus;
|
||||
}
|
||||
|
||||
public AbfsPermission(FsAction u, FsAction g, FsAction o) {
|
||||
super(u, g, o, false);
|
||||
this.aclBit = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if there is also an ACL (access control list).
|
||||
*
|
||||
* @return boolean true if there is also an ACL (access control list).
|
||||
* @deprecated Get acl bit from the {@link org.apache.hadoop.fs.FileStatus}
|
||||
* object.
|
||||
*/
|
||||
public boolean getAclBit() {
|
||||
return aclBit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj instanceof FsPermission) {
|
||||
FsPermission that = (FsPermission) obj;
|
||||
return this.getUserAction() == that.getUserAction()
|
||||
&& this.getGroupAction() == that.getGroupAction()
|
||||
&& this.getOtherAction() == that.getOtherAction()
|
||||
&& this.getStickyBit() == that.getStickyBit();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a AbfsPermission from a abfs symbolic permission string
|
||||
* @param abfsSymbolicPermission e.g. "rw-rw-rw-+" / "rw-rw-rw-"
|
||||
* @return a permission object for the provided string representation
|
||||
*/
|
||||
public static AbfsPermission valueOf(final String abfsSymbolicPermission) {
|
||||
if (abfsSymbolicPermission == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final boolean isExtendedAcl = abfsSymbolicPermission.charAt(abfsSymbolicPermission.length() - 1) == '+';
|
||||
|
||||
final String abfsRawSymbolicPermission = isExtendedAcl ? abfsSymbolicPermission.substring(0, abfsSymbolicPermission.length() - 1)
|
||||
: abfsSymbolicPermission;
|
||||
|
||||
int n = 0;
|
||||
for (int i = 0; i < abfsRawSymbolicPermission.length(); i++) {
|
||||
n = n << 1;
|
||||
char c = abfsRawSymbolicPermission.charAt(i);
|
||||
n += (c == '-' || c == 'T' || c == 'S') ? 0: 1;
|
||||
}
|
||||
|
||||
// Add sticky bit value if set
|
||||
if (abfsRawSymbolicPermission.charAt(abfsRawSymbolicPermission.length() - 1) == 't'
|
||||
|| abfsRawSymbolicPermission.charAt(abfsRawSymbolicPermission.length() - 1) == 'T') {
|
||||
n += STICKY_BIT_OCTAL_VALUE;
|
||||
}
|
||||
|
||||
return new AbfsPermission((short) n, isExtendedAcl);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether abfs symbolic permission string is a extended Acl
|
||||
* @param abfsSymbolicPermission e.g. "rw-rw-rw-+" / "rw-rw-rw-"
|
||||
* @return true if the permission string indicates the existence of an
|
||||
* extended ACL; otherwise false.
|
||||
*/
|
||||
public static boolean isExtendedAcl(final String abfsSymbolicPermission) {
|
||||
if (abfsSymbolicPermission == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return abfsSymbolicPermission.charAt(abfsSymbolicPermission.length() - 1) == '+';
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return toShort();
|
||||
}
|
||||
}
|
|
@ -43,6 +43,8 @@ public class ITestAzureBlobFileSystemBackCompat extends
|
|||
@Test
|
||||
public void testBlobBackCompat() throws Exception {
|
||||
final AzureBlobFileSystem fs = this.getFileSystem();
|
||||
// test only valid for non-namespace enabled account
|
||||
Assume.assumeFalse(fs.getIsNamespaceEnabeld());
|
||||
String storageConnectionString = getBlobConnectionString();
|
||||
CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
|
||||
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
|
||||
|
|
|
@ -20,11 +20,12 @@ package org.apache.hadoop.fs.azurebfs;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
||||
/**
|
||||
|
@ -32,9 +33,17 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
|||
*/
|
||||
public class ITestAzureBlobFileSystemFileStatus extends
|
||||
AbstractAbfsIntegrationTest {
|
||||
private static final String DEFAULT_FILE_PERMISSION_VALUE = "640";
|
||||
private static final String DEFAULT_DIR_PERMISSION_VALUE = "750";
|
||||
private static final String DEFAULT_UMASK_VALUE = "027";
|
||||
|
||||
private static final Path TEST_FILE = new Path("testFile");
|
||||
private static final Path TEST_FOLDER = new Path("testDir");
|
||||
|
||||
public ITestAzureBlobFileSystemFileStatus() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnsureStatusWorksForRoot() throws Exception {
|
||||
final AzureBlobFileSystem fs = this.getFileSystem();
|
||||
|
@ -48,20 +57,32 @@ public class ITestAzureBlobFileSystemFileStatus extends
|
|||
public void testFileStatusPermissionsAndOwnerAndGroup() throws Exception {
|
||||
final AzureBlobFileSystem fs = this.getFileSystem();
|
||||
touch(TEST_FILE);
|
||||
validateStatus(fs, TEST_FILE);
|
||||
validateStatus(fs, TEST_FILE, false);
|
||||
}
|
||||
|
||||
private FileStatus validateStatus(final AzureBlobFileSystem fs, final Path name)
|
||||
private FileStatus validateStatus(final AzureBlobFileSystem fs, final Path name, final boolean isDir)
|
||||
throws IOException {
|
||||
FileStatus fileStatus = fs.getFileStatus(name);
|
||||
fs.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, DEFAULT_UMASK_VALUE);
|
||||
|
||||
String errorInStatus = "error in " + fileStatus + " from " + fs;
|
||||
assertEquals(errorInStatus + ": permission",
|
||||
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL),
|
||||
fileStatus.getPermission());
|
||||
assertEquals(errorInStatus + ": owner",
|
||||
fs.getOwnerUser(), fileStatus.getOwner());
|
||||
assertEquals(errorInStatus + ": group",
|
||||
fs.getOwnerUserPrimaryGroup(), fileStatus.getGroup());
|
||||
|
||||
// When running with Oauth, the owner and group info retrieved from server will be digit ids.
|
||||
if (this.getAuthType() != AuthType.OAuth && !fs.isSecure()) {
|
||||
assertEquals(errorInStatus + ": owner",
|
||||
fs.getOwnerUser(), fileStatus.getOwner());
|
||||
assertEquals(errorInStatus + ": group",
|
||||
fs.getOwnerUserPrimaryGroup(), fileStatus.getGroup());
|
||||
} else {
|
||||
if (isDir) {
|
||||
assertEquals(errorInStatus + ": permission",
|
||||
new FsPermission(DEFAULT_DIR_PERMISSION_VALUE), fileStatus.getPermission());
|
||||
} else {
|
||||
assertEquals(errorInStatus + ": permission",
|
||||
new FsPermission(DEFAULT_FILE_PERMISSION_VALUE), fileStatus.getPermission());
|
||||
}
|
||||
}
|
||||
|
||||
return fileStatus;
|
||||
}
|
||||
|
||||
|
@ -70,7 +91,7 @@ public class ITestAzureBlobFileSystemFileStatus extends
|
|||
final AzureBlobFileSystem fs = this.getFileSystem();
|
||||
fs.mkdirs(TEST_FOLDER);
|
||||
|
||||
validateStatus(fs, TEST_FOLDER);
|
||||
validateStatus(fs, TEST_FOLDER, true);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -218,6 +218,9 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|||
String wasbUrl = testAccount.getFileSystem().getName();
|
||||
String abfsUrl = wasbUrlToAbfsUrl(wasbUrl);
|
||||
final AzureBlobFileSystem fs = this.getFileSystem(abfsUrl);
|
||||
// test only valid for non-namespace enabled account
|
||||
Assume.assumeFalse(fs.getIsNamespaceEnabeld());
|
||||
|
||||
byte[] buffer = getRandomBytesArray();
|
||||
CloudBlockBlob blob = testAccount.getBlobReference(TEST_FILE_PATH.toString().substring(1));
|
||||
try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
|
||||
|
@ -238,6 +241,9 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|||
String wasbUrl = testAccount.getFileSystem().getName();
|
||||
String abfsUrl = wasbUrlToAbfsUrl(wasbUrl);
|
||||
final AzureBlobFileSystem fs = this.getFileSystem(abfsUrl);
|
||||
// test only valid for non-namespace enabled account
|
||||
Assume.assumeFalse(fs.getIsNamespaceEnabeld());
|
||||
|
||||
byte[] buffer = getRandomBytesArray();
|
||||
CloudBlockBlob blob = testAccount.getBlobReference(TEST_FILE_PATH.toString().substring(1));
|
||||
try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.Parallelized;
|
||||
|
||||
/**
|
||||
* Test permission operations.
|
||||
*/
|
||||
@RunWith(Parallelized.class)
|
||||
public class ITestAzureBlobFileSystemPermission extends AbstractAbfsIntegrationTest{
|
||||
|
||||
private static Path testRoot = new Path("/test");
|
||||
private static final String DEFAULT_UMASK_VALUE = "027";
|
||||
private static final FsPermission DEFAULT_UMASK_PERMISSION = new FsPermission(DEFAULT_UMASK_VALUE);
|
||||
private static final int KILOBYTE = 1024;
|
||||
private FsPermission permission;
|
||||
|
||||
private Path path;
|
||||
|
||||
public ITestAzureBlobFileSystemPermission(FsPermission testPermission) throws Exception {
|
||||
super();
|
||||
permission = testPermission;
|
||||
|
||||
Assume.assumeTrue(this.getAuthType() == AuthType.OAuth);
|
||||
}
|
||||
|
||||
@Parameterized.Parameters(name = "{0}")
|
||||
public static Collection abfsCreateNonRecursiveTestData()
|
||||
throws Exception {
|
||||
/*
|
||||
Test Data
|
||||
File/Folder name, User permission, Group permission, Other Permission,
|
||||
Parent already exist
|
||||
shouldCreateSucceed, expectedExceptionIfFileCreateFails
|
||||
*/
|
||||
final Collection<Object[]> datas = new ArrayList<>();
|
||||
for (FsAction g : FsAction.values()) {
|
||||
for (FsAction o : FsAction.values()) {
|
||||
datas.add(new Object[] {new FsPermission(FsAction.ALL, g, o)});
|
||||
}
|
||||
}
|
||||
return datas;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilePermission() throws Exception {
|
||||
|
||||
final AzureBlobFileSystem fs = this.getFileSystem();
|
||||
fs.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, DEFAULT_UMASK_VALUE);
|
||||
path = new Path(testRoot, UUID.randomUUID().toString());
|
||||
|
||||
fs.mkdirs(path.getParent(),
|
||||
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
|
||||
fs.removeDefaultAcl(path.getParent());
|
||||
|
||||
fs.create(path, permission, true, KILOBYTE, (short) 1, KILOBYTE - 1, null);
|
||||
FileStatus status = fs.getFileStatus(path);
|
||||
Assert.assertEquals(permission.applyUMask(DEFAULT_UMASK_PERMISSION), status.getPermission());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFolderPermission() throws Exception {
|
||||
final AzureBlobFileSystem fs = this.getFileSystem();
|
||||
fs.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "027");
|
||||
|
||||
path = new Path(testRoot, UUID.randomUUID().toString());
|
||||
|
||||
fs.mkdirs(path.getParent(),
|
||||
new FsPermission(FsAction.ALL, FsAction.WRITE, FsAction.NONE));
|
||||
fs.removeDefaultAcl(path.getParent());
|
||||
|
||||
fs.mkdirs(path, permission);
|
||||
FileStatus status = fs.getFileStatus(path);
|
||||
Assert.assertEquals(permission.applyUMask(DEFAULT_UMASK_PERMISSION), status.getPermission());
|
||||
}
|
||||
}
|
|
@ -17,7 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.fs.azurebfs;
|
||||
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
@ -524,6 +523,9 @@ public class ITestAzureBlobFileSystemRandomRead extends
|
|||
}
|
||||
|
||||
private void createTestFile() throws Exception {
|
||||
final AzureBlobFileSystem abFs = this.getFileSystem();
|
||||
// test only valid for non-namespace enabled account
|
||||
Assume.assumeFalse(abFs.getIsNamespaceEnabeld());
|
||||
FileSystem fs = this.getWasbFileSystem();
|
||||
|
||||
if (fs.exists(TEST_FILE_PATH)) {
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -133,4 +134,17 @@ public class ITestAzureBlobFileSystemRename extends
|
|||
new Path(fs.getUri().toString() + "/s"),
|
||||
false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPosixRenameDirectory() throws Exception {
|
||||
final AzureBlobFileSystem fs = this.getFileSystem();
|
||||
fs.mkdirs(new Path("testDir2/test1/test2/test3"));
|
||||
fs.mkdirs(new Path("testDir2/test4"));
|
||||
Assert.assertTrue(fs.rename(new Path("testDir2/test1/test2/test3"), new Path("testDir2/test4")));
|
||||
assertTrue(fs.exists(new Path("testDir2")));
|
||||
assertTrue(fs.exists(new Path("testDir2/test1/test2")));
|
||||
assertTrue(fs.exists(new Path("testDir2/test4")));
|
||||
assertTrue(fs.exists(new Path("testDir2/test4/test3")));
|
||||
assertFalse(fs.exists(new Path("testDir2/test1/test2/test3")));
|
||||
}
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -58,6 +58,9 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
|
|||
public void testListFileStatus() throws Exception {
|
||||
// crate file using abfs
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
// test only valid for non-namespace enabled account
|
||||
Assume.assumeFalse(fs.getIsNamespaceEnabeld());
|
||||
|
||||
NativeAzureFileSystem wasb = getWasbFileSystem();
|
||||
|
||||
Path path1 = new Path("/testfiles/~12/!008/3/abFsTestfile");
|
||||
|
@ -89,6 +92,9 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
|
|||
boolean[] readFileWithAbfs = new boolean[]{false, true, true, false};
|
||||
|
||||
AzureBlobFileSystem abfs = getFileSystem();
|
||||
// test only valid for non-namespace enabled account
|
||||
Assume.assumeFalse(abfs.getIsNamespaceEnabeld());
|
||||
|
||||
NativeAzureFileSystem wasb = getWasbFileSystem();
|
||||
|
||||
for (int i = 0; i< 4; i++) {
|
||||
|
@ -125,6 +131,9 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
|
|||
boolean[] readDirWithAbfs = new boolean[]{false, true, true, false};
|
||||
|
||||
AzureBlobFileSystem abfs = getFileSystem();
|
||||
// test only valid for non-namespace enabled account
|
||||
Assume.assumeFalse(abfs.getIsNamespaceEnabeld());
|
||||
|
||||
NativeAzureFileSystem wasb = getWasbFileSystem();
|
||||
|
||||
for (int i = 0; i < 4; i++) {
|
||||
|
@ -156,6 +165,9 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
|
|||
public void testSetWorkingDirectory() throws Exception {
|
||||
//create folders
|
||||
AzureBlobFileSystem abfs = getFileSystem();
|
||||
// test only valid for non-namespace enabled account
|
||||
Assume.assumeFalse(abfs.getIsNamespaceEnabeld());
|
||||
|
||||
NativeAzureFileSystem wasb = getWasbFileSystem();
|
||||
|
||||
Path d1d4 = new Path("/d1/d2/d3/d4");
|
||||
|
|
|
@ -0,0 +1,119 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.azurebfs.utils;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclEntryScope;
|
||||
import org.apache.hadoop.fs.permission.AclEntryType;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
* Helper methods useful for writing ACL tests.
|
||||
*/
|
||||
public final class AclTestHelpers {
|
||||
|
||||
/**
|
||||
* Create a new AclEntry with scope, type and permission (no name).
|
||||
*
|
||||
* @param scope AclEntryScope scope of the ACL entry
|
||||
* @param type AclEntryType ACL entry type
|
||||
* @param permission FsAction set of permissions in the ACL entry
|
||||
* @return AclEntry new AclEntry
|
||||
*/
|
||||
public static AclEntry aclEntry(AclEntryScope scope, AclEntryType type,
|
||||
FsAction permission) {
|
||||
return new AclEntry.Builder()
|
||||
.setScope(scope)
|
||||
.setType(type)
|
||||
.setPermission(permission)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new AclEntry with scope, type, name and permission.
|
||||
*
|
||||
* @param scope AclEntryScope scope of the ACL entry
|
||||
* @param type AclEntryType ACL entry type
|
||||
* @param name String optional ACL entry name
|
||||
* @param permission FsAction set of permissions in the ACL entry
|
||||
* @return AclEntry new AclEntry
|
||||
*/
|
||||
public static AclEntry aclEntry(AclEntryScope scope, AclEntryType type,
|
||||
String name, FsAction permission) {
|
||||
return new AclEntry.Builder()
|
||||
.setScope(scope)
|
||||
.setType(type)
|
||||
.setName(name)
|
||||
.setPermission(permission)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new AclEntry with scope, type and name (no permission).
|
||||
*
|
||||
* @param scope AclEntryScope scope of the ACL entry
|
||||
* @param type AclEntryType ACL entry type
|
||||
* @param name String optional ACL entry name
|
||||
* @return AclEntry new AclEntry
|
||||
*/
|
||||
public static AclEntry aclEntry(AclEntryScope scope, AclEntryType type,
|
||||
String name) {
|
||||
return new AclEntry.Builder()
|
||||
.setScope(scope)
|
||||
.setType(type)
|
||||
.setName(name)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new AclEntry with scope and type (no name or permission).
|
||||
*
|
||||
* @param scope AclEntryScope scope of the ACL entry
|
||||
* @param type AclEntryType ACL entry type
|
||||
* @return AclEntry new AclEntry
|
||||
*/
|
||||
public static AclEntry aclEntry(AclEntryScope scope, AclEntryType type) {
|
||||
return new AclEntry.Builder()
|
||||
.setScope(scope)
|
||||
.setType(type)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts the value of the FsPermission bits on the inode of a specific path.
|
||||
*
|
||||
* @param fs FileSystem to use for check
|
||||
* @param pathToCheck Path inode to check
|
||||
* @param perm short expected permission bits
|
||||
* @throws IOException thrown if there is an I/O error
|
||||
*/
|
||||
public static void assertPermission(FileSystem fs, Path pathToCheck,
|
||||
short perm) throws IOException {
|
||||
assertEquals(perm, fs.getFileStatus(pathToCheck).getPermission().toShort());
|
||||
}
|
||||
|
||||
private AclTestHelpers() {
|
||||
// Not called.
|
||||
}
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.utils;
|
||||
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.model.RunnerScheduler;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Provided for convenience to execute parametrized test cases concurrently.
|
||||
*/
|
||||
public class Parallelized extends Parameterized {
|
||||
|
||||
public Parallelized(Class classObj) throws Throwable {
|
||||
super(classObj);
|
||||
setScheduler(new ThreadPoolScheduler());
|
||||
}
|
||||
|
||||
private static class ThreadPoolScheduler implements RunnerScheduler {
|
||||
private ExecutorService executor;
|
||||
|
||||
ThreadPoolScheduler() {
|
||||
int numThreads = 10;
|
||||
executor = Executors.newFixedThreadPool(numThreads);
|
||||
}
|
||||
|
||||
public void finished() {
|
||||
executor.shutdown();
|
||||
try {
|
||||
executor.awaitTermination(10, TimeUnit.MINUTES);
|
||||
} catch (InterruptedException exc) {
|
||||
throw new RuntimeException(exc);
|
||||
}
|
||||
}
|
||||
|
||||
public void schedule(Runnable childStatement) {
|
||||
executor.submit(childStatement);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue