From 80c8b77cecc4bc79384439d77c162be4a52d1c32 Mon Sep 17 00:00:00 2001 From: Da Zhou Date: Thu, 7 Feb 2019 21:58:21 +0000 Subject: [PATCH] HADOOP-15954. ABFS: Enable owner and group conversion for MSI and login user using OAuth. Contributed by Da Zhou and Junhua Gu. --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 10 + .../fs/azurebfs/AzureBlobFileSystem.java | 37 +-- .../fs/azurebfs/AzureBlobFileSystemStore.java | 179 ++++++----- .../azurebfs/constants/ConfigurationKeys.java | 23 +- .../azurebfs/oauth2/IdentityTransformer.java | 278 ++++++++++++++++ .../src/site/markdown/testing_azure.md | 55 ++++ .../ITestAbfsIdentityTransformer.java | 301 ++++++++++++++++++ 7 files changed, 773 insertions(+), 110 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index b9bc7f20204..67055c54fdd 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -217,6 +217,16 @@ public class AbfsConfiguration{ return rawConfig.get(accountConf(key), rawConfig.get(key)); } + /** + * Returns the account-specific value if it exists, then looks for an + * account-agnostic value. + * @param key Account-agnostic configuration key + * @return value if one exists, else the default value + */ + public String getString(String key, String defaultValue) { + return rawConfig.get(accountConf(key), rawConfig.get(key, defaultValue)); + } + /** * Returns the account-specific value if it exists, then looks for an * account-agnostic value, and finally tries the default value. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 4c24ac8fe33..e321e9e88e4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -83,9 +83,6 @@ public class AzureBlobFileSystem extends FileSystem { public static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystem.class); private URI uri; private Path workingDir; - private UserGroupInformation userGroupInformation; - private String user; - private String primaryUserGroup; private AzureBlobFileSystemStore abfsStore; private boolean isClosed; @@ -103,9 +100,7 @@ public class AzureBlobFileSystem extends FileSystem { LOG.debug("Initializing AzureBlobFileSystem for {}", uri); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); - this.userGroupInformation = UserGroupInformation.getCurrentUser(); - this.user = userGroupInformation.getUserName(); - this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration, userGroupInformation); + this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration); final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration(); this.setWorkingDirectory(this.getHomeDirectory()); @@ -120,18 +115,6 @@ public class AzureBlobFileSystem extends FileSystem { } } - if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) { - try { - this.primaryUserGroup = userGroupInformation.getPrimaryGroupName(); - } catch (IOException ex) { - LOG.error("Failed to get primary group for {}, using user name as primary group name", user); - this.primaryUserGroup = this.user; - } - } else { - //Provide a default group name - this.primaryUserGroup = this.user; - } - if (UserGroupInformation.isSecurityEnabled()) { this.delegationTokenEnabled = abfsConfiguration.isDelegationTokenManagerEnabled(); @@ -153,8 +136,8 @@ public class AzureBlobFileSystem extends FileSystem { final StringBuilder sb = new StringBuilder( "AzureBlobFileSystem{"); sb.append("uri=").append(uri); - sb.append(", user='").append(user).append('\''); - sb.append(", primaryUserGroup='").append(primaryUserGroup).append('\''); + sb.append(", user='").append(abfsStore.getUser()).append('\''); + sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\''); sb.append('}'); return sb.toString(); } @@ -503,7 +486,7 @@ public class AzureBlobFileSystem extends FileSystem { public Path getHomeDirectory() { return makeQualified(new Path( FileSystemConfigurations.USER_HOME_DIRECTORY_PREFIX - + "/" + this.userGroupInformation.getShortUserName())); + + "/" + abfsStore.getUser())); } /** @@ -554,12 +537,20 @@ public class AzureBlobFileSystem extends FileSystem { super.finalize(); } + /** + * Get the username of the FS. + * @return the short name of the user who instantiated the FS + */ public String getOwnerUser() { - return user; + return abfsStore.getUser(); } + /** + * Get the group name of the owner of the FS. + * @return primary group name + */ public String getOwnerUserPrimaryGroup() { - return primaryUserGroup; + return abfsStore.getPrimaryGroup(); } private boolean deleteRoot() throws IOException { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 5c28bd47456..c2739e91039 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -67,6 +67,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; +import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer; import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; @@ -88,9 +89,7 @@ import org.apache.http.client.utils.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SUPER_USER; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT; - /** * Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage. */ @@ -101,7 +100,6 @@ public class AzureBlobFileSystemStore { private AbfsClient client; private URI uri; - private final UserGroupInformation userGroupInformation; private String userName; private String primaryUserGroup; private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss 'GMT'"; @@ -113,11 +111,12 @@ public class AzureBlobFileSystemStore { private boolean isNamespaceEnabledSet; private boolean isNamespaceEnabled; private final AuthType authType; + private final UserGroupInformation userGroupInformation; + private final IdentityTransformer identityTransformer; - public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration, UserGroupInformation userGroupInformation) - throws AzureBlobFileSystemException, IOException { + public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration) + throws IOException { this.uri = uri; - String[] authorityParts = authorityParts(uri); final String fileSystemName = authorityParts[0]; final String accountName = authorityParts[1]; @@ -127,10 +126,8 @@ public class AzureBlobFileSystemStore { } catch (IllegalAccessException exception) { throw new FileSystemOperationUnhandledException(exception); } - - this.userGroupInformation = userGroupInformation; + this.userGroupInformation = UserGroupInformation.getCurrentUser(); this.userName = userGroupInformation.getShortUserName(); - if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) { try { this.primaryUserGroup = userGroupInformation.getPrimaryGroupName(); @@ -145,12 +142,25 @@ public class AzureBlobFileSystemStore { this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList( abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA))); - this.authType = abfsConfiguration.getAuthType(accountName); boolean usingOauth = (authType == AuthType.OAuth); boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme; initializeClient(uri, fileSystemName, accountName, useHttps); + this.identityTransformer = new IdentityTransformer(abfsConfiguration.getRawConfiguration()); + } + /** + * @return local user name. + * */ + public String getUser() { + return this.userName; + } + + /** + * @return primary group that user belongs to. + * */ + public String getPrimaryGroup() { + return this.primaryUserGroup; } private String[] authorityParts(URI uri) throws InvalidUriAuthorityException, InvalidUriException { @@ -452,60 +462,54 @@ public class AzureBlobFileSystemStore { path, isNamespaceEnabled); + final AbfsRestOperation op; if (path.isRoot()) { - final AbfsRestOperation op = isNamespaceEnabled - ? client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH) - : client.getFilesystemProperties(); - - final long blockSize = abfsConfiguration.getAzureBlockSize(); - final String owner = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER); - final String group = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP); - final String permissions = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS); - final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); - final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED); - final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions); - - return new VersionedFileStatus( - isSuperUserOrEmpty(owner) ? userName : owner, - isSuperUserOrEmpty(group) ? primaryUserGroup : group, - permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL) - : AbfsPermission.valueOf(permissions), - hasAcl, - 0, - true, - 1, - blockSize, - parseLastModifiedTime(lastModified), - path, - eTag); + op = isNamespaceEnabled + ? client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH) + : client.getFilesystemProperties(); } else { - AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), isNamespaceEnabled); - - final long blockSize = abfsConfiguration.getAzureBlockSize(); - final AbfsHttpOperation result = op.getResult(); - final String eTag = result.getResponseHeader(HttpHeaderConfigurations.ETAG); - final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED); - final String contentLength = result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH); - final String resourceType = result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); - final String owner = result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER); - final String group = result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP); - final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS)); - final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions); - - return new VersionedFileStatus( - isSuperUserOrEmpty(owner) ? userName : owner, - isSuperUserOrEmpty(group) ? primaryUserGroup : group, - permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL) - : AbfsPermission.valueOf(permissions), - hasAcl, - parseContentLength(contentLength), - parseIsDirectory(resourceType), - 1, - blockSize, - parseLastModifiedTime(lastModified), - path, - eTag); + op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), isNamespaceEnabled); } + + final long blockSize = abfsConfiguration.getAzureBlockSize(); + final AbfsHttpOperation result = op.getResult(); + + final String eTag = result.getResponseHeader(HttpHeaderConfigurations.ETAG); + final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED); + final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS)); + final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions); + final long contentLength; + final boolean resourceIsDir; + + if (path.isRoot()) { + contentLength = 0; + resourceIsDir = true; + } else { + contentLength = parseContentLength(result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); + resourceIsDir = parseIsDirectory(result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE)); + } + + final String transformedOwner = identityTransformer.transformIdentityForGetRequest( + result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER), + userName); + + final String transformedGroup = identityTransformer.transformIdentityForGetRequest( + result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP), + primaryUserGroup); + + return new VersionedFileStatus( + transformedOwner, + transformedGroup, + permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL) + : AbfsPermission.valueOf(permissions), + hasAcl, + contentLength, + resourceIsDir, + 1, + blockSize, + parseLastModifiedTime(lastModified), + path, + eTag); } public FileStatus[] listStatus(final Path path) throws IOException { @@ -532,8 +536,8 @@ public class AzureBlobFileSystemStore { long blockSize = abfsConfiguration.getAzureBlockSize(); for (ListResultEntrySchema entry : retrievedSchema.paths()) { - final String owner = isSuperUserOrEmpty(entry.owner()) ? userName : entry.owner(); - final String group = isSuperUserOrEmpty(entry.group()) ? primaryUserGroup : entry.group(); + final String owner = identityTransformer.transformIdentityForGetRequest(entry.owner(), userName); + final String group = identityTransformer.transformIdentityForGetRequest(entry.group(), primaryUserGroup); final FsPermission fsPermission = entry.permissions() == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL) : AbfsPermission.valueOf(entry.permissions()); @@ -566,7 +570,7 @@ public class AzureBlobFileSystemStore { } while (continuation != null && !continuation.isEmpty()); - return fileStatuses.toArray(new FileStatus[0]); + return fileStatuses.toArray(new FileStatus[fileStatuses.size()]); } public void setOwner(final Path path, final String owner, final String group) throws @@ -576,20 +580,17 @@ public class AzureBlobFileSystemStore { "This operation is only valid for storage accounts with the hierarchical namespace enabled."); } - String effectiveOwner = owner; - String effectiveGroup = group; - if (authType == AuthType.SharedKey && owner.equals(userName)) { - effectiveOwner = SUPER_USER; - effectiveGroup = SUPER_USER; - } - LOG.debug( "setOwner filesystem: {} path: {} owner: {} group: {}", client.getFileSystem(), path.toString(), - effectiveOwner, - effectiveGroup); - client.setOwner(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), effectiveOwner, effectiveGroup); + owner, + group); + + final String transformedOwner = identityTransformer.transformUserOrGroupForSetRequest(owner); + final String transformedGroup = identityTransformer.transformUserOrGroupForSetRequest(group); + + client.setOwner(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), transformedOwner, transformedGroup); } public void setPermission(final Path path, final FsPermission permission) throws @@ -620,7 +621,9 @@ public class AzureBlobFileSystemStore { client.getFileSystem(), path.toString(), AclEntry.aclSpecToString(aclSpec)); - final Map modifyAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec)); + + final List transformedAclEntries = identityTransformer.transformAclEntriesForSetRequest(aclSpec); + final Map modifyAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(transformedAclEntries)); boolean useUpn = AbfsAclHelper.isUpnFormatAclEntries(modifyAclEntries); final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), useUpn); @@ -645,7 +648,9 @@ public class AzureBlobFileSystemStore { client.getFileSystem(), path.toString(), AclEntry.aclSpecToString(aclSpec)); - final Map removeAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec)); + + final List transformedAclEntries = identityTransformer.transformAclEntriesForSetRequest(aclSpec); + final Map removeAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(transformedAclEntries)); boolean isUpnFormat = AbfsAclHelper.isUpnFormatAclEntries(removeAclEntries); final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), isUpnFormat); @@ -722,7 +727,9 @@ public class AzureBlobFileSystemStore { client.getFileSystem(), path.toString(), AclEntry.aclSpecToString(aclSpec)); - final Map aclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec)); + + final List transformedAclEntries = identityTransformer.transformAclEntriesForSetRequest(aclSpec); + final Map aclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(transformedAclEntries)); final boolean isUpnFormat = AbfsAclHelper.isUpnFormatAclEntries(aclEntries); final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), isUpnFormat); @@ -749,8 +756,13 @@ public class AzureBlobFileSystemStore { AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true)); AbfsHttpOperation result = op.getResult(); - final String owner = result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER); - final String group = result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP); + final String transformedOwner = identityTransformer.transformIdentityForGetRequest( + result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER), + userName); + final String transformedGroup = identityTransformer.transformIdentityForGetRequest( + result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP), + primaryUserGroup); + final String permissions = result.getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS); final String aclSpecString = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL); @@ -759,8 +771,8 @@ public class AzureBlobFileSystemStore { : AbfsPermission.valueOf(permissions); final AclStatus.Builder aclStatusBuilder = new AclStatus.Builder(); - aclStatusBuilder.owner(isSuperUserOrEmpty(owner)? userName : owner); - aclStatusBuilder.group(isSuperUserOrEmpty(group) ? primaryUserGroup : group); + aclStatusBuilder.owner(transformedOwner); + aclStatusBuilder.group(transformedGroup); aclStatusBuilder.setPermission(fsPermission); aclStatusBuilder.stickyBit(fsPermission.getStickyBit()); @@ -944,11 +956,6 @@ public class AzureBlobFileSystemStore { return false; } - private boolean isSuperUserOrEmpty(final String name) { - return name == null || name.equals(SUPER_USER); - } - - private static class VersionedFileStatus extends FileStatus { private final String version; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index d079d94bc3d..8cd86bf929e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -55,7 +55,28 @@ public final class ConfigurationKeys { public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix"; public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode"; public static final String FS_AZURE_USE_UPN = "fs.azure.use.upn"; - + /** User principal names (UPNs) have the format “{alias}@{domain}”. If true, + * only {alias} is included when a UPN would otherwise appear in the output + * of APIs like getFileStatus, getOwner, getAclStatus, etc. Default is false. **/ + public static final String FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME = "fs.azure.identity.transformer.enable.short.name"; + /** If the domain name is specified and “fs.azure.identity.transformer.enable.short.name” + * is true, then the {alias} part of a UPN can be specified as input to APIs like setOwner and + * setAcl and it will be transformed to a UPN by appending @ and the domain specified by + * this configuration property. **/ + public static final String FS_AZURE_FILE_OWNER_DOMAINNAME = "fs.azure.identity.transformer.domain.name"; + /** An Azure Active Directory object ID (oid) used as the replacement for names contained in the + * list specified by “fs.azure.identity.transformer.service.principal.substitution.list. + * Notice that instead of setting oid, you can also set $superuser.**/ + public static final String FS_AZURE_OVERRIDE_OWNER_SP = "fs.azure.identity.transformer.service.principal.id"; + /** A comma separated list of names to be replaced with the service principal ID specified by + * “fs.default.identity.transformer.service.principal.id”. This substitution occurs + * when setOwner, setAcl, modifyAclEntries, or removeAclEntries are invoked with identities + * contained in the substitution list. Notice that when in non-secure cluster, asterisk symbol "*" + * can be used to match all user/group. **/ + public static final String FS_AZURE_OVERRIDE_OWNER_SP_LIST = "fs.azure.identity.transformer.service.principal.substitution.list"; + /** By default this is set as false, so “$superuser” is replaced with the current user when it appears as the owner + * or owning group of a file or directory. To disable it, set it as true. **/ + public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement"; public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider"; public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java new file mode 100644 index 00000000000..62b54d1c431 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclEntryType; +import org.apache.hadoop.security.UserGroupInformation; + +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.AT; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SUPER_USER; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SKIP_SUPER_USER_REPLACEMENT; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_DOMAINNAME; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP_LIST; + +/** + * Perform transformation for Azure Active Directory identities used in owner, group and acls. + */ +public class IdentityTransformer { + private static final Logger LOG = LoggerFactory.getLogger(IdentityTransformer.class); + + private boolean isSecure; + private String servicePrincipalId; + private String serviceWhiteList; + private String domainName; + private boolean enableShortName; + private boolean skipUserIdentityReplacement; + private boolean skipSuperUserReplacement; + private boolean domainIsSet; + private static final String UUID_PATTERN = "^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$"; + + public IdentityTransformer(Configuration configuration) throws IOException { + Preconditions.checkNotNull(configuration, "configuration"); + this.isSecure = UserGroupInformation.getCurrentUser().isSecurityEnabled(); + this.servicePrincipalId = configuration.get(FS_AZURE_OVERRIDE_OWNER_SP, ""); + this.serviceWhiteList = configuration.get(FS_AZURE_OVERRIDE_OWNER_SP_LIST, ""); + this.domainName = configuration.get(FS_AZURE_FILE_OWNER_DOMAINNAME, ""); + this.enableShortName = configuration.getBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, false); + + // - "servicePrincipalId" and "serviceWhiteList" are required for + // transformation between localUserOrGroup and principalId,$superuser + // - "enableShortName" is required for transformation between shortName and fullyQualifiedName. + this.skipUserIdentityReplacement = servicePrincipalId.isEmpty() && serviceWhiteList.isEmpty() && !enableShortName; + this.skipSuperUserReplacement = configuration.getBoolean(FS_AZURE_SKIP_SUPER_USER_REPLACEMENT, false); + + if (enableShortName){ + // need to check the domain setting only when short name is enabled. + // if shortName is not enabled, transformer won't transform a shortName to + // a fully qualified name. + this.domainIsSet = !domainName.isEmpty(); + } + } + + /** + * Perform identity transformation for the Get request results in AzureBlobFileSystemStore: + * getFileStatus(), listStatus(), getAclStatus(). + * Input originalUserOrGroup can be one of the following: + * 1. $superuser: + * by default it will be transformed to local user/group, this can be disabled by setting + * "fs.azure.identity.transformer.skip.superuser.replacement" to true. + * + * 2. User principal id: + * can be transformed to localUserOrGroup, if this principal id matches the principal id set in + * "fs.azure.identity.transformer.service.principal.id" and localUserOrGroup is stated in + * "fs.azure.identity.transformer.service.principal.substitution.list" + * + * 3. User principal name (UPN): + * can be transformed to a short name(localUserOrGroup) if "fs.azure.identity.transformer.enable.short.name" + * is enabled. + * + * @param originalUserOrGroup the original user or group in the get request results: FileStatus, AclStatus. + * @param localUserOrGroup the local user or group, should be parsed from UserGroupInformation. + * @return owner or group after transformation. + * */ + public String transformIdentityForGetRequest(String originalUserOrGroup, String localUserOrGroup) { + if (originalUserOrGroup == null) { + originalUserOrGroup = localUserOrGroup; + // localUserOrGroup might be a full name, so continue the transformation. + } + // case 1: it is $superuser and replace $superuser config is enabled + if (!skipSuperUserReplacement && SUPER_USER.equals(originalUserOrGroup)) { + return localUserOrGroup; + } + + if (skipUserIdentityReplacement) { + return originalUserOrGroup; + } + + // case 2: original owner is principalId set in config, and localUser + // is a daemon service specified in substitution list, + // To avoid ownership check failure in job task, replace it + // to local daemon user/group + if (originalUserOrGroup.equals(servicePrincipalId) && isInSubstitutionList(localUserOrGroup)) { + return localUserOrGroup; + } + + // case 3: If original owner is a fully qualified name, and + // short name is enabled, replace with shortName. + if (shouldUseShortUserName(originalUserOrGroup)) { + return getShortName(originalUserOrGroup); + } + + return originalUserOrGroup; + } + + /** + * Perform Identity transformation when setting owner on a path. + * There are four possible input: + * 1.short name; 2.$superuser; 3.Fully qualified name; 4. principal id. + * + * short name could be transformed to: + * - A service principal id or $superuser, if short name belongs a daemon service + * stated in substitution list AND "fs.azure.identity.transformer.service.principal.id" + * is set with $superuser or a principal id. + * - Fully qualified name, if "fs.azure.identity.transformer.domain.name" is set in configuration. + * + * $superuser, fully qualified name and principalId should not be transformed. + * + * @param userOrGroup the user or group to be set as owner. + * @return user or group after transformation. + * */ + public String transformUserOrGroupForSetRequest(String userOrGroup) { + if (userOrGroup == null || userOrGroup.isEmpty() || skipUserIdentityReplacement) { + return userOrGroup; + } + + // case 1: when the owner to be set is stated in substitution list. + if (isInSubstitutionList(userOrGroup)) { + return servicePrincipalId; + } + + // case 2: when the owner is a short name of the user principal name(UPN). + if (shouldUseFullyQualifiedUserName(userOrGroup)) { + return getFullyQualifiedName(userOrGroup); + } + + return userOrGroup; + } + + /** + * Perform Identity transformation when calling setAcl(),removeAclEntries() and modifyAclEntries() + * If the AclEntry type is a user or group, and its name is one of the following: + * 1.short name; 2.$superuser; 3.Fully qualified name; 4. principal id. + * Short name could be transformed to: + * - A service principal id or $superuser, if short name belongs a daemon service + * stated in substitution list AND "fs.azure.identity.transformer.service.principal.id" + * is set with $superuser or a principal id. + * - A fully qualified name, if the AclEntry type is User AND if "fs.azure.identity.transformer.domain.name" + * is set in configuration. This is to make the behavior consistent with HDI. + * + * $superuser, fully qualified name and principal id should not be transformed. + * + * @param aclEntries list of AclEntry + * @return list of AclEntry after the identity transformation. + * */ + public List transformAclEntriesForSetRequest(final List aclEntries) { + if (skipUserIdentityReplacement) { + return aclEntries; + } + + for (int i = 0; i < aclEntries.size(); i++) { + AclEntry aclEntry = aclEntries.get(i); + String name = aclEntry.getName(); + String transformedName = name; + if (name == null || name.isEmpty() || aclEntry.getType().equals(AclEntryType.OTHER) || aclEntry.getType().equals(AclEntryType.MASK)) { + continue; + } + + // case 1: when the user or group name to be set is stated in substitution list. + if (isInSubstitutionList(name)) { + transformedName = servicePrincipalId; + } else if (aclEntry.getType().equals(AclEntryType.USER) // case 2: when the owner is a short name + && shouldUseFullyQualifiedUserName(name)) { // of the user principal name (UPN). + // Notice: for group type ACL entry, if name is shortName. + // It won't be converted to Full Name. This is + // to make the behavior consistent with HDI. + transformedName = getFullyQualifiedName(name); + } + + // Avoid unnecessary new AclEntry allocation + if (transformedName.equals(name)) { + continue; + } + + AclEntry.Builder aclEntryBuilder = new AclEntry.Builder(); + aclEntryBuilder.setType(aclEntry.getType()); + aclEntryBuilder.setName(transformedName); + aclEntryBuilder.setScope(aclEntry.getScope()); + aclEntryBuilder.setPermission(aclEntry.getPermission()); + + // Replace the original AclEntry + aclEntries.set(i, aclEntryBuilder.build()); + } + return aclEntries; + } + + /** + * Internal method to identify if owner name returned by the ADLS backend is short name or not. + * If name contains "@", this code assumes that whatever comes after '@' is domain name and ignores it. + * @param owner + * @return + */ + private boolean isShortUserName(String owner) { + return (owner != null) && !owner.contains(AT); + } + + private boolean shouldUseShortUserName(String owner){ + return enableShortName && !isShortUserName(owner); + } + + private String getShortName(String userName) { + if (userName == null) { + return null; + } + + if (isShortUserName(userName)) { + return userName; + } + + String userNameBeforeAt = userName.substring(0, userName.indexOf(AT)); + if (isSecure) { + //In secure clusters we apply auth to local rules to lowercase all short localUser names (notice /L at the end), + //E.G. : RULE:[1:$1@$0](.*@FOO.ONMICROSOFT.COM)s/@.*/// Ideally we should use the HadoopKerberosName class to get + // new HadoopKerberosName(arg).getShortName. However, + //1. ADLS can report the Realm in lower case while returning file owner names( ie. : Some.User@realm.onmicrosoft.com) + //2. The RULE specification does not allow specifying character classes to do case insensitive matches + //Due to this, we end up using a forced lowercase version of the manually shortened name + return userNameBeforeAt.toLowerCase(Locale.ENGLISH); + } + return userNameBeforeAt; + } + + private String getFullyQualifiedName(String name){ + if (domainIsSet && (name != null) && !name.contains(AT)){ + return name + AT + domainName; + } + return name; + } + + private boolean shouldUseFullyQualifiedUserName(String owner){ + return domainIsSet && !SUPER_USER.equals(owner) && !isUuid(owner) && enableShortName && isShortUserName(owner); + } + + private boolean isInSubstitutionList(String localUserName) { + return serviceWhiteList.contains(STAR) || serviceWhiteList.contains(localUserName); + } + + private boolean isUuid(String input) { + if (input == null) return false; + return input.matches(UUID_PATTERN); + } +} diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md b/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md index c2afe74e22e..40a372db1e3 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md @@ -733,6 +733,61 @@ hierarchical namespace enabled, and set the following configuration settings: AAD client id. --> + + + ``` If running tests against an endpoint that uses the URL format diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java new file mode 100644 index 00000000000..41f680e4c14 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; + +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer; +import org.apache.hadoop.fs.permission.AclEntry; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; + +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SUPER_USER; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_DOMAINNAME; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP_LIST; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SKIP_SUPER_USER_REPLACEMENT; +import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry; +import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS; +import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT; +import static org.apache.hadoop.fs.permission.AclEntryType.GROUP; +import static org.apache.hadoop.fs.permission.AclEntryType.MASK; +import static org.apache.hadoop.fs.permission.AclEntryType.OTHER; +import static org.apache.hadoop.fs.permission.AclEntryType.USER; +import static org.apache.hadoop.fs.permission.FsAction.ALL; + +/** + * Test IdentityTransformer. + */ +//@RunWith(Parameterized.class) +public class ITestAbfsIdentityTransformer extends AbstractAbfsScaleTest{ + private final UserGroupInformation userGroupInfo; + private final String localUser; + private final String localGroup; + private static final String DAEMON = "daemon"; + private static final String ASTERISK = "*"; + private static final String SHORT_NAME = "abc"; + private static final String DOMAIN = "domain.com"; + private static final String FULLY_QUALIFIED_NAME = "abc@domain.com"; + private static final String SERVICE_PRINCIPAL_ID = UUID.randomUUID().toString(); + + public ITestAbfsIdentityTransformer() throws Exception { + super(); + userGroupInfo = UserGroupInformation.getCurrentUser(); + localUser = userGroupInfo.getShortUserName(); + localGroup = userGroupInfo.getPrimaryGroupName(); + } + + @Test + public void testDaemonServiceSettingIdentity() throws IOException { + Configuration config = this.getRawConfiguration(); + resetIdentityConfig(config); + // Default config + IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config); + assertEquals("Identity should not change for default config", + DAEMON, identityTransformer.transformUserOrGroupForSetRequest(DAEMON)); + + // Add service principal id + config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID); + + // case 1: substitution list doesn't contain daemon + config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "a,b,c,d"); + identityTransformer = getTransformerWithCustomizedIdentityConfig(config); + assertEquals("Identity should not change when substitution list doesn't contain daemon", + DAEMON, identityTransformer.transformUserOrGroupForSetRequest(DAEMON)); + + // case 2: substitution list contains daemon name + config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, DAEMON + ",a,b,c,d"); + identityTransformer = getTransformerWithCustomizedIdentityConfig(config); + assertEquals("Identity should be replaced to servicePrincipalId", + SERVICE_PRINCIPAL_ID, identityTransformer.transformUserOrGroupForSetRequest(DAEMON)); + + // case 3: substitution list is * + config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, ASTERISK); + identityTransformer = getTransformerWithCustomizedIdentityConfig(config); + assertEquals("Identity should be replaced to servicePrincipalId", + SERVICE_PRINCIPAL_ID, identityTransformer.transformUserOrGroupForSetRequest(DAEMON)); + } + + @Test + public void testFullyQualifiedNameSettingIdentity() throws IOException { + Configuration config = this.getRawConfiguration(); + // Default config + IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config); + assertEquals("short name should not be converted to full name by default", + SHORT_NAME, identityTransformer.transformUserOrGroupForSetRequest(SHORT_NAME)); + + resetIdentityConfig(config); + + // Add config to get fully qualified username + config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true); + config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN); + identityTransformer = getTransformerWithCustomizedIdentityConfig(config); + assertEquals("short name should be converted to full name", + FULLY_QUALIFIED_NAME, identityTransformer.transformUserOrGroupForSetRequest(SHORT_NAME)); + } + + @Test + public void testNoOpForSettingOidAsIdentity() throws IOException { + Configuration config = this.getRawConfiguration(); + resetIdentityConfig(config); + + config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true); + config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN); + config.set(FS_AZURE_OVERRIDE_OWNER_SP, UUID.randomUUID().toString()); + config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "a,b,c,d"); + + IdentityTransformer identityTransformer = getTransformerWithCustomizedIdentityConfig(config); + final String principalId = UUID.randomUUID().toString(); + assertEquals("Identity should not be changed when owner is already a principal id ", + principalId, identityTransformer.transformUserOrGroupForSetRequest(principalId)); + } + + @Test + public void testNoOpWhenSettingSuperUserAsdentity() throws IOException { + Configuration config = this.getRawConfiguration(); + resetIdentityConfig(config); + + config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true); + config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN); + // Default config + IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config); + assertEquals("Identity should not be changed because it is not in substitution list", + SUPER_USER, identityTransformer.transformUserOrGroupForSetRequest(SUPER_USER)); + } + + @Test + public void testIdentityReplacementForSuperUserGetRequest() throws IOException { + Configuration config = this.getRawConfiguration(); + resetIdentityConfig(config); + + // with default config, identityTransformer should do $superUser replacement + IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config); + assertEquals("$superuser should be replaced with local user by default", + localUser, identityTransformer.transformIdentityForGetRequest(SUPER_USER, localUser)); + + // Disable $supeuser replacement + config.setBoolean(FS_AZURE_SKIP_SUPER_USER_REPLACEMENT, true); + identityTransformer = getTransformerWithCustomizedIdentityConfig(config); + assertEquals("$superuser should not be replaced", + SUPER_USER, identityTransformer.transformIdentityForGetRequest(SUPER_USER, localUser)); + } + + @Test + public void testIdentityReplacementForDaemonServiceGetRequest() throws IOException { + Configuration config = this.getRawConfiguration(); + resetIdentityConfig(config); + + // Default config + IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config); + assertEquals("By default servicePrincipalId should not be converted for GetFileStatus(), listFileStatus(), getAcl()", + SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser)); + + resetIdentityConfig(config); + // 1. substitution list doesn't contain currentUser + config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "a,b,c,d"); + identityTransformer = getTransformerWithCustomizedIdentityConfig(config); + assertEquals("servicePrincipalId should not be replaced if local daemon user is not in substitution list", + SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser)); + + resetIdentityConfig(config); + // 2. substitution list contains currentUser(daemon name) but the service principal id in config doesn't match + config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, localUser + ",a,b,c,d"); + config.set(FS_AZURE_OVERRIDE_OWNER_SP, UUID.randomUUID().toString()); + identityTransformer = getTransformerWithCustomizedIdentityConfig(config); + assertEquals("servicePrincipalId should not be replaced if it is not equal to the SPN set in config", + SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser)); + + resetIdentityConfig(config); + // 3. substitution list contains currentUser(daemon name) and the service principal id in config matches + config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, localUser + ",a,b,c,d"); + config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID); + identityTransformer = getTransformerWithCustomizedIdentityConfig(config); + assertEquals("servicePrincipalId should be transformed to local use", + localUser, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser)); + + resetIdentityConfig(config); + // 4. substitution is "*" but the service principal id in config doesn't match the input + config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, ASTERISK); + config.set(FS_AZURE_OVERRIDE_OWNER_SP, UUID.randomUUID().toString()); + identityTransformer = getTransformerWithCustomizedIdentityConfig(config); + assertEquals("servicePrincipalId should not be replaced if it is not equal to the SPN set in config", + SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser)); + + resetIdentityConfig(config); + // 5. substitution is "*" and the service principal id in config match the input + config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, ASTERISK); + config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID); + identityTransformer = getTransformerWithCustomizedIdentityConfig(config); + assertEquals("servicePrincipalId should be transformed to local user", + localUser, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser)); + } + + @Test + public void testIdentityReplacementForKinitUserGetRequest() throws IOException { + Configuration config = this.getRawConfiguration(); + resetIdentityConfig(config); + + // Default config + IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config); + assertEquals("full name should not be transformed if shortname is not enabled", + FULLY_QUALIFIED_NAME, identityTransformer.transformIdentityForGetRequest(FULLY_QUALIFIED_NAME, localUser)); + + // add config to get short name + config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true); + identityTransformer = getTransformerWithCustomizedIdentityConfig(config); + assertEquals("should convert the full name to shortname ", + SHORT_NAME, identityTransformer.transformIdentityForGetRequest(FULLY_QUALIFIED_NAME, localUser)); + } + + @Test + public void transformAclEntriesForSetRequest() throws IOException { + Configuration config = this.getRawConfiguration(); + resetIdentityConfig(config); + + List aclEntriesToBeTransformed = Lists.newArrayList( + aclEntry(ACCESS, USER, DAEMON, ALL), + aclEntry(ACCESS, USER, FULLY_QUALIFIED_NAME,ALL), + aclEntry(DEFAULT, USER, SUPER_USER, ALL), + aclEntry(DEFAULT, USER, SERVICE_PRINCIPAL_ID, ALL), + aclEntry(DEFAULT, USER, SHORT_NAME, ALL), + aclEntry(DEFAULT, GROUP, DAEMON, ALL), + aclEntry(DEFAULT, GROUP, SHORT_NAME, ALL), // Notice: for group type ACL entry, if name is shortName, + aclEntry(DEFAULT, OTHER, ALL), // It won't be converted to Full Name. This is + aclEntry(DEFAULT, MASK, ALL) // to make the behavior consistent with HDI. + ); + + // Default config should not change the identities + IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config); + checkAclEntriesList(aclEntriesToBeTransformed, identityTransformer.transformAclEntriesForSetRequest(aclEntriesToBeTransformed)); + + resetIdentityConfig(config); + // With config + config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, DAEMON + ",a,b,c,d"); + config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true); + config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN); + config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID); + identityTransformer = getTransformerWithCustomizedIdentityConfig(config); + + // expected acl entries + List expectedAclEntries = Lists.newArrayList( + aclEntry(ACCESS, USER, SERVICE_PRINCIPAL_ID, ALL), + aclEntry(ACCESS, USER, FULLY_QUALIFIED_NAME, ALL), + aclEntry(DEFAULT, USER, SUPER_USER, ALL), + aclEntry(DEFAULT, USER, SERVICE_PRINCIPAL_ID, ALL), + aclEntry(DEFAULT, USER, FULLY_QUALIFIED_NAME, ALL), + aclEntry(DEFAULT, GROUP, SERVICE_PRINCIPAL_ID, ALL), + aclEntry(DEFAULT, GROUP, SHORT_NAME, ALL), + aclEntry(DEFAULT, OTHER, ALL), + aclEntry(DEFAULT, MASK, ALL) + ); + + checkAclEntriesList(identityTransformer.transformAclEntriesForSetRequest(aclEntriesToBeTransformed), expectedAclEntries); + + } + + private void resetIdentityConfig(Configuration config) { + config.unset(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME); + config.unset(FS_AZURE_FILE_OWNER_DOMAINNAME); + config.unset(FS_AZURE_OVERRIDE_OWNER_SP); + config.unset(FS_AZURE_OVERRIDE_OWNER_SP_LIST); + config.unset(FS_AZURE_SKIP_SUPER_USER_REPLACEMENT); + } + + private IdentityTransformer getTransformerWithDefaultIdentityConfig(Configuration config) throws IOException { + resetIdentityConfig(config); + return new IdentityTransformer(config); + } + + private IdentityTransformer getTransformerWithCustomizedIdentityConfig(Configuration config) throws IOException { + return new IdentityTransformer(config); + } + + private void checkAclEntriesList(List aclEntries, List expected) { + assertTrue("list size not equals", aclEntries.size() == expected.size()); + for (int i = 0; i < aclEntries.size(); i++) { + assertEquals("Identity doesn't match", expected.get(i).getName(), aclEntries.get(i).getName()); + } + } +}