HADOOP-16068. ABFS Authentication and Delegation Token plugins to optionally be bound to specific URI of the store.

Contributed by Steve Loughran.
This commit is contained in:
Steve Loughran 2019-02-28 14:22:08 +00:00
parent 84c4966a5a
commit 65f60e56b0
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
37 changed files with 3237 additions and 92 deletions

View File

@ -262,7 +262,22 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<scope>test</scope>
</dependency>
<!-- Used to create SSL certs for a secure Keystore -->
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>

View File

@ -514,7 +514,7 @@ public AccessTokenProvider getTokenProvider() throws TokenAccessProviderExceptio
} catch(IllegalArgumentException e) {
throw e;
} catch (Exception e) {
throw new TokenAccessProviderException("Unable to load custom token provider class.", e);
throw new TokenAccessProviderException("Unable to load custom token provider class: " + e, e);
}
} else {

View File

@ -70,6 +70,7 @@
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
@ -121,6 +122,8 @@ public void initialize(URI uri, Configuration configuration)
if (this.delegationTokenEnabled) {
LOG.debug("Initializing DelegationTokenManager for {}", uri);
this.delegationTokenManager = abfsConfiguration.getDelegationTokenManager();
delegationTokenManager.bind(getUri(), configuration);
LOG.debug("Created DelegationTokenManager {}", delegationTokenManager);
}
}
@ -419,9 +422,10 @@ public synchronized void close() throws IOException {
if (isClosed) {
return;
}
// does all the delete-on-exit calls, and may be slow.
super.close();
LOG.debug("AzureBlobFileSystem.close");
IOUtils.cleanupWithLogger(LOG, abfsStore, delegationTokenManager);
this.isClosed = true;
}
@ -1023,6 +1027,20 @@ public synchronized Token<?> getDelegationToken(final String renewer) throws IOE
: super.getDelegationToken(renewer);
}
/**
* If Delegation tokens are enabled, the canonical service name of
* this filesystem is the filesystem URI.
* @return either the filesystem URI as a string, or null.
*/
@Override
public String getCanonicalServiceName() {
String name = null;
if (delegationTokenManager != null) {
name = delegationTokenManager.getCanonicalServiceName();
}
return name != null ? name : super.getCanonicalServiceName();
}
@VisibleForTesting
FileSystem.Statistics getFsStatistics() {
return this.statistics;
@ -1053,6 +1071,15 @@ AbfsClient getAbfsClient() {
return abfsStore.getClient();
}
/**
* Get any Delegation Token manager created by the filesystem.
* @return the DT manager or null.
*/
@VisibleForTesting
AbfsDelegationTokenManager getDelegationTokenManager() {
return delegationTokenManager;
}
@VisibleForTesting
boolean getIsNamespaceEnabled() throws AzureBlobFileSystemException {
return abfsStore.getIsNamespaceEnabled();

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
@ -66,6 +67,7 @@
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
@ -84,6 +86,7 @@
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
@ -95,7 +98,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class AzureBlobFileSystemStore {
public class AzureBlobFileSystemStore implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystemStore.class);
private AbfsClient client;
@ -163,6 +166,11 @@ public String getPrimaryGroup() {
return this.primaryUserGroup;
}
@Override
public void close() throws IOException {
IOUtils.cleanupWithLogger(LOG, client);
}
private String[] authorityParts(URI uri) throws InvalidUriAuthorityException, InvalidUriException {
final String authority = uri.getRawAuthority();
if (null == authority) {
@ -788,7 +796,8 @@ public boolean isAtomicRenameKey(String key) {
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
}
private void initializeClient(URI uri, String fileSystemName, String accountName, boolean isSecure) throws AzureBlobFileSystemException {
private void initializeClient(URI uri, String fileSystemName, String accountName, boolean isSecure)
throws IOException {
if (this.client != null) {
return;
}
@ -817,6 +826,8 @@ private void initializeClient(URI uri, String fileSystemName, String accountName
abfsConfiguration.getStorageAccountKey());
} else {
tokenProvider = abfsConfiguration.getTokenProvider();
ExtensionHelper.bind(tokenProvider, uri,
abfsConfiguration.getRawConfiguration());
}
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(), tokenProvider);

View File

@ -53,7 +53,7 @@ public AbfsRestOperationException(
final String errorMessage,
final Exception innerException,
final AbfsHttpOperation abfsHttpOperation) {
super(formatMessage(abfsHttpOperation));
super(formatMessage(abfsHttpOperation), innerException);
this.statusCode = statusCode;
this.errorCode = AzureServiceErrorCode.getAzureServiceCode(this.statusCode, errorCode);
@ -61,7 +61,7 @@ public AbfsRestOperationException(
}
public AbfsRestOperationException(final HttpException innerException) {
super(innerException.getMessage());
super(innerException.getMessage(), innerException);
this.statusCode = innerException.getHttpErrorCode();
this.errorCode = AzureServiceErrorCode.UNKNOWN;
@ -100,4 +100,4 @@ private static String formatMessage(final AbfsHttpOperation abfsHttpOperation) {
// Remove break line to ensure the request id and timestamp can be shown in console.
abfsHttpOperation.getStorageErrorMessage().replaceAll("\\n", " "));
}
}
}

View File

@ -32,5 +32,6 @@ public TokenAccessProviderException(String message) {
public TokenAccessProviderException(String message, Throwable cause) {
super(message);
initCause(cause);
}
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.extensions;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
/**
* An optional extension for custom extensions, so as to support
* tighter integration.
*
* This interface can be implemented by either of a
* {@link CustomDelegationTokenManager} or a {@link CustomTokenProviderAdaptee}.
*
* In both cases, extra lifecycle operation will be invoked.
*
* <ol>
* <li>{@link #bind(URI, Configuration)} will
* be invoked after {@code initialize()}</li>
* <li>{@link Closeable#close()} will be invoked
* when the Filesystem instance is closed.</li>
* </ol>
*
* The {@link #getCanonicalServiceName()} will be invoked on a Custom
* DT provider when the filesystem is asked for a Canonical Service Name.
*
* The {@link #getUserAgentSuffix()} is invoked on a CustomTokenProviderAdaptee
* as the filesystem is initialized; the User Agent Suffix which it returns
* is included in the UA header used for the ABFS Client -and so logged
* in the ABFS access logs.
*
* This allows for token providers to to provide extra information
* about the caller for use in auditing requests.
*/
@InterfaceAudience.LimitedPrivate("authorization-subsystems")
@InterfaceStability.Unstable
public interface BoundDTExtension extends Closeable {
/**
* Bind the extension to the specific instance of ABFS.
* This happens during the ABFS's own initialization logic; it is unlikely
* to be completely instantiated at this point.
* Therefore, while a reference may be cached, implementations MUST NOT
* invoke methods on it.
* @param fsURI URI of the filesystem.
* @param conf configuration of this extension.
* @throws IOException failure during binding.
*/
void bind(URI fsURI, Configuration conf) throws IOException;
/**
* Get the canonical service name, which will be
* returned by {@code FileSystem.getCanonicalServiceName()} and so used to
* map the issued DT in credentials, including credential files collected
* for job submission.
*
* If null is returned: fall back to the default filesystem logic.
*
* Only invoked on {@link CustomDelegationTokenManager} instances.
* @return the service name to be returned by the filesystem.
*/
default String getCanonicalServiceName() {
return null;
}
/**
* Get a suffix for the UserAgent suffix of HTTP requests, which
* can be used to identify the principal making ABFS requests.
* @return an empty string, or a key=value string to be added to the UA
* header.
*/
default String getUserAgentSuffix() {
return "";
}
}

View File

@ -29,6 +29,13 @@
/**
* Interface for Managing the Delegation tokens.
*
* Implementations which also implement BoundDTExtension will have
* the its {@code bind()} called
* after {@code initialize)} and before any calls to
* {@link #getDelegationToken(String)}.
* It will not be bound during token renew or cancel operations: there is
* no Filesystem to bind to in those operations.
*/
@InterfaceAudience.LimitedPrivate("authorization-subsystems")
@InterfaceStability.Unstable

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.extensions;
import java.io.IOException;
import java.net.URI;
import java.util.Optional;
import java.util.function.Function;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
/**
* Classes to help with use of extensions, expecially those
* implementing @{@link BoundDTExtension}.
*/
@InterfaceAudience.LimitedPrivate("authorization-subsystems")
@InterfaceStability.Unstable
public final class ExtensionHelper {
private ExtensionHelper() {
}
/**
* If the passed in extension class implements {@link BoundDTExtension}
* then it will have its {@link BoundDTExtension#bind(URI, Configuration)}
* method called.
* @param extension extension to examine and maybe invoke
* @param uri URI of the filesystem.
* @param conf configuration of this extension.
* @throws IOException failure during binding.
*/
public static void bind(Object extension, URI uri, Configuration conf)
throws IOException {
if (extension instanceof BoundDTExtension) {
((BoundDTExtension) extension).bind(uri, conf);
}
}
/**
* Close an extension if it is closeable.
* Any error raised is caught and logged.
* @param extension extension instance.
*/
public static void close(Object extension) {
ifBoundDTExtension(extension,
v -> {
IOUtils.closeStreams(v);
return null;
});
}
/**
* Invoke {@link BoundDTExtension#getUserAgentSuffix()} or
* return the default value.
* @param extension extension to invoke
* @param def default if the class is of the wrong type.
* @return a user agent suffix
*/
public static String getUserAgentSuffix(Object extension, String def) {
return ifBoundDTExtension(extension, BoundDTExtension::getUserAgentSuffix)
.orElse(def);
}
/**
* Invoke {@link BoundDTExtension#getCanonicalServiceName()} or
* return the default value.
* @param extension extension to invoke
* @param def default if the class is of the wrong type.
* @return a canonical service name.
*/
public static String getCanonicalServiceName(Object extension, String def) {
return ifBoundDTExtension(extension, BoundDTExtension::getCanonicalServiceName)
.orElse(def);
}
/**
* Invoke an operation on an object if it implements the BoundDTExtension
* interface; returns an optional value.
* @param extension the extension to invoke.
* @param fn function to apply
* @param <V> return type of te function.
* @return an optional value which, if not empty, contains the return value
* of the invoked function. If empty: the object was not of a compatible
* type.
*/
public static <V> Optional<V> ifBoundDTExtension(Object extension,
Function<? super BoundDTExtension, ? extends V> fn) {
if (extension instanceof BoundDTExtension) {
return Optional.of((BoundDTExtension) extension).map(fn);
} else {
return Optional.empty();
}
}
}

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.azurebfs.services.AbfsIoUtils;
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
/**
@ -165,8 +166,14 @@ public static AzureADToken getTokenUsingRefreshToken(String clientId,
* failed to get the Azure Active Directory token.
*/
public static class HttpException extends IOException {
private int httpErrorCode;
private String requestId;
private final int httpErrorCode;
private final String requestId;
private final String url;
private final String contentType;
private final String body;
/**
* Gets Http error status code.
@ -184,11 +191,63 @@ public String getRequestId() {
return this.requestId;
}
HttpException(int httpErrorCode, String requestId, String message) {
HttpException(
final int httpErrorCode,
final String requestId,
final String message,
final String url,
final String contentType,
final String body) {
super(message);
this.httpErrorCode = httpErrorCode;
this.requestId = requestId;
this.url = url;
this.contentType = contentType;
this.body = body;
}
public String getUrl() {
return url;
}
public String getContentType() {
return contentType;
}
public String getBody() {
return body;
}
@Override
public String getMessage() {
final StringBuilder sb = new StringBuilder();
sb.append("HTTP Error ");
sb.append(httpErrorCode);
sb.append("; url='").append(url).append('\'');
sb.append(' ');
sb.append(super.getMessage());
sb.append("; requestId='").append(requestId).append('\'');
sb.append("; contentType='").append(contentType).append('\'');
sb.append("; response '").append(body).append('\'');
return sb.toString();
}
}
/**
* An unexpected HTTP response was raised, such as text coming back
* from what should be an OAuth endpoint.
*/
public static class UnexpectedResponseException extends HttpException {
public UnexpectedResponseException(final int httpErrorCode,
final String requestId,
final String message,
final String url,
final String contentType,
final String body) {
super(httpErrorCode, requestId, message, url, contentType, body);
}
}
private static AzureADToken getTokenCall(String authEndpoint, String body,
@ -236,6 +295,8 @@ private static AzureADToken getTokenSingleCall(
}
try {
LOG.debug("Requesting an OAuth token by {} to {}",
httpMethod, authEndpoint);
URL url = new URL(urlString);
conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod(httpMethod);
@ -248,13 +309,18 @@ private static AzureADToken getTokenSingleCall(
}
}
conn.setRequestProperty("Connection", "close");
AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
conn.getRequestProperties());
if (httpMethod.equals("POST")) {
conn.setDoOutput(true);
conn.getOutputStream().write(payload.getBytes("UTF-8"));
}
int httpResponseCode = conn.getResponseCode();
LOG.debug("Response {}", httpResponseCode);
AbfsIoUtils.dumpHeadersToDebugLog("Response Headers",
conn.getHeaderFields());
String requestId = conn.getHeaderField("x-ms-request-id");
String responseContentType = conn.getHeaderField("Content-Type");
long responseContentLength = conn.getHeaderFieldLong("Content-Length", 0);
@ -265,23 +331,49 @@ private static AzureADToken getTokenSingleCall(
InputStream httpResponseStream = conn.getInputStream();
token = parseTokenFromStream(httpResponseStream);
} else {
String responseBody = consumeInputStream(conn.getErrorStream(), 1024);
InputStream stream = conn.getErrorStream();
if (stream == null) {
// no error stream, try the original input stream
stream = conn.getInputStream();
}
String responseBody = consumeInputStream(stream, 1024);
String proxies = "none";
String httpProxy = System.getProperty("http.proxy");
String httpsProxy = System.getProperty("https.proxy");
if (httpProxy != null || httpsProxy != null) {
proxies = "http:" + httpProxy + "; https:" + httpsProxy;
}
String logMessage =
"AADToken: HTTP connection failed for getting token from AzureAD. Http response: "
+ httpResponseCode + " " + conn.getResponseMessage()
+ "\nContent-Type: " + responseContentType
+ " Content-Length: " + responseContentLength
+ " Request ID: " + requestId.toString()
String operation = "AADToken: HTTP connection to " + authEndpoint
+ " failed for getting token from AzureAD.";
String logMessage = operation
+ " HTTP response: " + httpResponseCode
+ " " + conn.getResponseMessage()
+ " Proxies: " + proxies
+ "\nFirst 1K of Body: " + responseBody;
+ (responseBody.isEmpty()
? ""
: ("\nFirst 1K of Body: " + responseBody));
LOG.debug(logMessage);
throw new HttpException(httpResponseCode, requestId, logMessage);
if (httpResponseCode == HttpURLConnection.HTTP_OK) {
// 200 is returned by some of the sign-on pages, but can also
// come from proxies, utterly wrong URLs, etc.
throw new UnexpectedResponseException(httpResponseCode,
requestId,
operation
+ " Unexpected response."
+ " Check configuration, URLs and proxy settings."
+ " proxies=" + proxies,
authEndpoint,
responseContentType,
responseBody);
} else {
// general HTTP error
throw new HttpException(httpResponseCode,
requestId,
operation,
authEndpoint,
responseContentType,
responseBody);
}
}
} finally {
if (conn != null) {
@ -330,6 +422,10 @@ private static AzureADToken parseTokenFromStream(InputStream httpResponseStream)
}
private static String consumeInputStream(InputStream inStream, int length) throws IOException {
if (inStream == null) {
// the HTTP request returned an empty body
return "";
}
byte[] b = new byte[length];
int totalBytesRead = 0;
int bytesRead = 0;

View File

@ -20,18 +20,23 @@
import java.io.IOException;
import java.net.URI;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension;
import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee;
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
/**
* Provides tokens based on custom implementation, following the Adapter Design
* Pattern.
*/
public final class CustomTokenProviderAdapter extends AccessTokenProvider {
public final class CustomTokenProviderAdapter extends AccessTokenProvider
implements BoundDTExtension {
private CustomTokenProviderAdaptee adaptee;
private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
@ -55,4 +60,40 @@ protected AzureADToken refreshToken() throws IOException {
return azureADToken;
}
}
/**
* Bind to the filesystem by passing the binding call on
* to any custom token provider adaptee which implements
* {@link BoundDTExtension}.
* No-op if they don't.
* @param fsURI URI of the filesystem.
* @param conf configuration of this extension.
* @throws IOException failure.
*/
@Override
public void bind(final URI fsURI,
final Configuration conf)
throws IOException {
ExtensionHelper.bind(adaptee, fsURI, conf);
}
@Override
public void close() {
ExtensionHelper.close(adaptee);
}
/**
* Get a suffix for the UserAgent suffix of HTTP requests, which
* can be used to identify the principal making ABFS requests.
*
* If the adaptee is a BoundDTExtension, it is queried for a UA Suffix;
* otherwise "" is returned.
*
* @return an empty string, or a key=value string to be added to the UA
* header.
*/
public String getUserAgentSuffix() {
String suffix = ExtensionHelper.getUserAgentSuffix(adaptee, "");
return suffix != null ? suffix : "";
}
}

View File

@ -24,8 +24,17 @@
/**
* Delegation token Identifier for ABFS delegation tokens.
* The token kind from {@link #getKind()} is {@link #TOKEN_KIND}, always.
*
* Subclasses have to very careful when looking up tokens (which will of
* course be registered in the credentials as of this kind), in case the
* incoming credentials are actually of a different subtype.
*/
public class AbfsDelegationTokenIdentifier extends DelegationTokenIdentifier {
/**
* The token kind of these tokens: ""ABFS delegation".
*/
public static final Text TOKEN_KIND = new Text("ABFS delegation");
public AbfsDelegationTokenIdentifier(){
@ -41,6 +50,13 @@ public AbfsDelegationTokenIdentifier(Text kind, Text owner, Text renewer,
super(kind, owner, renewer, realUser);
}
/**
* Get the token kind.
* Returns {@link #TOKEN_KIND} always.
* If a subclass does not want its renew/cancel process to be managed
* by {@link AbfsDelegationTokenManager}, this must be overridden.
* @return the kind of the token.
*/
@Override
public Text getKind() {
return TOKEN_KIND;

View File

@ -19,28 +19,45 @@
package org.apache.hadoop.fs.azurebfs.security;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension;
import org.apache.hadoop.fs.azurebfs.extensions.CustomDelegationTokenManager;
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Class for delegation token Manager.
*
* Instantiates the class declared in
* {@link ConfigurationKeys#FS_AZURE_DELEGATION_TOKEN_PROVIDER_TYPE} and
* issues tokens from it.
*/
public class AbfsDelegationTokenManager {
public class AbfsDelegationTokenManager implements BoundDTExtension {
private CustomDelegationTokenManager tokenManager;
private static final Logger LOG =
LoggerFactory.getLogger(AbfsDelegationTokenManager.class);
/**
* Create the custom delegation token manager and call its
* {@link CustomDelegationTokenManager#initialize(Configuration)} method.
* @param conf configuration
* @throws IOException failure during initialization.
* @throws RuntimeException classloading problems.
*/
public AbfsDelegationTokenManager(final Configuration conf) throws IOException {
Preconditions.checkNotNull(conf, "conf");
@ -54,23 +71,75 @@ public AbfsDelegationTokenManager(final Configuration conf) throws IOException {
"The value for \"fs.azure.delegation.token.provider.type\" is not defined.");
}
CustomDelegationTokenManager customTokenMgr = (CustomDelegationTokenManager) ReflectionUtils
CustomDelegationTokenManager customTokenMgr = ReflectionUtils
.newInstance(customDelegationTokenMgrClass, conf);
if (customTokenMgr == null) {
throw new IllegalArgumentException(String.format("Failed to initialize %s.", customDelegationTokenMgrClass));
}
Preconditions.checkArgument(customTokenMgr != null,
"Failed to initialize %s.", customDelegationTokenMgrClass);
customTokenMgr.initialize(conf);
tokenManager = customTokenMgr;
}
/**
* Bind to a filesystem instance by passing the binding information down
* to any token manager which implements {@link BoundDTExtension}.
*
* This is not invoked before renew or cancel operations, but is guaranteed
* to be invoked before calls to {@link #getDelegationToken(String)}.
* @param fsURI URI of the filesystem.
* @param conf configuration of this extension.
* @throws IOException bind failure.
*/
@Override
public void bind(final URI fsURI, final Configuration conf)
throws IOException {
Preconditions.checkNotNull(fsURI, "Np Filesystem URI");
ExtensionHelper.bind(tokenManager, fsURI, conf);
}
/**
* Query the token manager for the service name; if it does not implement
* the extension interface, null is returned.
* @return the canonical service name.
*/
@Override
public String getCanonicalServiceName() {
return ExtensionHelper.getCanonicalServiceName(tokenManager, null);
}
/**
* Close.
* If the token manager is closeable, it has its {@link Closeable#close()}
* method (quietly) invoked.
*/
@Override
public void close() {
if (tokenManager instanceof Closeable) {
IOUtils.cleanupWithLogger(LOG, (Closeable) tokenManager);
}
}
/**
* Get a delegation token by invoking
* {@link CustomDelegationTokenManager#getDelegationToken(String)}.
* If the token returned already has a Kind; that is used.
* If not, then the token kind is set to
* {@link AbfsDelegationTokenIdentifier#TOKEN_KIND}, which implicitly
* resets any token renewer class.
* @param renewer the principal permitted to renew the token.
* @return a token for the filesystem.
* @throws IOException failure.
*/
public Token<DelegationTokenIdentifier> getDelegationToken(
String renewer) throws IOException {
LOG.debug("Requesting Delegation token for {}", renewer);
Token<DelegationTokenIdentifier> token = tokenManager.getDelegationToken(renewer);
token.setKind(AbfsDelegationTokenIdentifier.TOKEN_KIND);
if (token.getKind() == null) {
// if a token type is not set, use the default.
// note: this also sets the renewer to null.
token.setKind(AbfsDelegationTokenIdentifier.TOKEN_KIND);
}
return token;
}
@ -85,4 +154,18 @@ public void cancelDelegationToken(Token<?> token)
tokenManager.cancelDelegationToken(token);
}
@VisibleForTesting
public CustomDelegationTokenManager getTokenManager() {
return tokenManager;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"AbfsDelegationTokenManager{");
sb.append("tokenManager=").append(tokenManager);
sb.append('}');
return sb.toString();
}
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.security;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.DtFetcher;
import org.apache.hadoop.security.token.Token;
/**
* A DT fetcher for Abfs.
* This is a copy-and-paste of
* {@code org.apache.hadoop.hdfs.HdfsDtFetcher}.
*
* It is needed for the `hadoop dtutil` command.
*/
public class AbfsDtFetcher implements DtFetcher {
private static final String FETCH_FAILED =
"Filesystem not generating Delegation Tokens";
/**
* Returns the service name for the scheme..
*/
public Text getServiceName() {
return new Text(getScheme());
}
/**
* Get the scheme for this specific fetcher.
* @return a scheme.
*/
protected String getScheme() {
return FileSystemUriSchemes.ABFS_SCHEME;
}
public boolean isTokenRequired() {
return UserGroupInformation.isSecurityEnabled();
}
/**
* Returns Token object via FileSystem, null if bad argument.
* @param conf - a Configuration object used with FileSystem.get()
* @param creds - a Credentials object to which token(s) will be added
* @param renewer - the renewer to send with the token request
* @param url - the URL to which the request is sent
* @return a Token, or null if fetch fails.
*/
public Token<?> addDelegationTokens(Configuration conf,
Credentials creds,
String renewer,
String url) throws Exception {
if (!url.startsWith(getServiceName().toString())) {
url = getServiceName().toString() + "://" + url;
}
FileSystem fs = FileSystem.get(URI.create(url), conf);
Token<?> token = fs.getDelegationToken(renewer);
if (token == null) {
throw new IOException(FETCH_FAILED + ": " + url);
}
creds.addToken(token.getService(), token);
return token;
}
}

View File

@ -30,6 +30,8 @@
/**
* Token Renewer for renewing ABFS delegation tokens with remote service.
*
* Handles tokens of kind {@link AbfsDelegationTokenIdentifier#TOKEN_KIND}.
*/
public class AbfsTokenRenewer extends TokenRenewer {
public static final Logger LOG =
@ -93,4 +95,4 @@ private AbfsDelegationTokenManager getInstance(Configuration conf)
throws IOException {
return new AbfsDelegationTokenManager(conf);
}
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.security;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
/**
* The DT Fetcher for abfss.
*/
public class AbfssDtFetcher extends AbfsDtFetcher {
/**
* Get the scheme for this specific fetcher.
* @return a scheme.
*/
protected String getScheme() {
return FileSystemUriSchemes.ABFS_SECURE_SCHEME;
}
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
@InterfaceAudience.Private
@InterfaceAudience.LimitedPrivate("authorization-subsystems")
@InterfaceStability.Unstable
package org.apache.hadoop.fs.azurebfs.security;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.io.Closeable;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
@ -37,8 +38,10 @@
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.io.IOUtils;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
@ -48,7 +51,7 @@
/**
* AbfsClient.
*/
public class AbfsClient {
public class AbfsClient implements Closeable {
public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
private final URL baseUrl;
private final SharedKeyCredentials sharedKeyCredentials;
@ -87,6 +90,13 @@ public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredent
this.tokenProvider = tokenProvider;
}
@Override
public void close() throws IOException {
if (tokenProvider instanceof Closeable) {
IOUtils.cleanupWithLogger(LOG, (Closeable) tokenProvider);
}
}
public String getFileSystem() {
return filesystem;
}
@ -571,6 +581,11 @@ String initializeUserAgent(final AbfsConfiguration abfsConfiguration,
sb.append("; ");
sb.append(sslProviderName);
}
String tokenProviderField =
ExtensionHelper.getUserAgentSuffix(tokenProvider, "");
if (!tokenProviderField.isEmpty()) {
sb.append("; ").append(tokenProviderField);
}
sb.append(")");
final String userAgentComment = sb.toString();
String customUserAgentId = abfsConfiguration.getCustomUserAgentPrefix();
@ -578,7 +593,7 @@ String initializeUserAgent(final AbfsConfiguration abfsConfiguration,
return String.format(Locale.ROOT, CLIENT_VERSION + " %s %s",
userAgentComment, customUserAgentId);
}
return String.format(CLIENT_VERSION + " %s", userAgentComment);
return String.format(Locale.ROOT, CLIENT_VERSION + " %s", userAgentComment);
}
@VisibleForTesting

View File

@ -268,6 +268,9 @@ public void processResponse(final byte[] buffer, final int offset, final int len
if (this.requestId == null) {
this.requestId = AbfsHttpConstants.EMPTY_STRING;
}
// dump the headers
AbfsIoUtils.dumpHeadersToDebugLog("Response Headers",
connection.getHeaderFields());
if (AbfsHttpConstants.HTTP_METHOD_HEAD.equals(this.method)) {
// If it is HEAD, and it is ERROR

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.StringUtils;
/**
* Utility classes to work with the remote store.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class AbfsIoUtils {
private static final Logger LOG = LoggerFactory.getLogger(AbfsIoUtils.class);
private AbfsIoUtils() {
}
/**
* Dump the headers of a request/response to the log at DEBUG level.
* @param origin header origin for log
* @param headers map of headers.
*/
public static void dumpHeadersToDebugLog(final String origin,
final Map<String, List<String>> headers) {
if (LOG.isDebugEnabled()) {
LOG.debug("{}", origin);
for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
String key = entry.getKey();
if (key == null) {
key = "HTTP Response";
}
String values = StringUtils.join(";", entry.getValue());
if (key.contains("Cookie")) {
values = "*cookie info*";
}
LOG.debug(" {}={}",
key,
values);
}
}
}
}

View File

@ -149,15 +149,19 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS
// sign the HTTP request
if (client.getAccessToken() == null) {
LOG.debug("Signing request with shared key");
// sign the HTTP request
client.getSharedKeyCredentials().signRequest(
httpOperation.getConnection(),
hasRequestBody ? bufferLength : 0);
} else {
LOG.debug("Authenticating request with OAuth2 access token");
httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
client.getAccessToken());
}
// dump the headers
AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
httpOperation.getConnection().getRequestProperties());
AbfsClientThrottlingIntercept.sendingRequest(operationType);
if (hasRequestBody) {

View File

@ -39,6 +39,9 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.utils.Base64;
@ -48,6 +51,9 @@
* account.
*/
public class SharedKeyCredentials {
private static final Logger LOG = LoggerFactory.getLogger(
SharedKeyCredentials.class);
private static final int EXPECTED_BLOB_QUEUE_CANONICALIZED_STRING_LENGTH = 300;
private static final Pattern CRLF = Pattern.compile("\r\n", Pattern.LITERAL);
private static final String HMAC_SHA256 = "HmacSHA256";
@ -76,14 +82,19 @@ public SharedKeyCredentials(final String accountName,
public void signRequest(HttpURLConnection connection, final long contentLength) throws UnsupportedEncodingException {
connection.setRequestProperty(HttpHeaderConfigurations.X_MS_DATE, getGMTTime());
String gmtTime = getGMTTime();
connection.setRequestProperty(HttpHeaderConfigurations.X_MS_DATE, gmtTime);
final String stringToSign = canonicalize(connection, accountName, contentLength);
final String computedBase64Signature = computeHmac256(stringToSign);
String signature = String.format("%s %s:%s", "SharedKey", accountName,
computedBase64Signature);
connection.setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
String.format("%s %s:%s", "SharedKey", accountName, computedBase64Signature));
signature);
LOG.debug("Signing request with timestamp of {} and signature {}",
gmtTime, signature);
}
private String computeHmac256(final String stringToSign) {

View File

@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.hadoop.fs.azurebfs.security.AbfsDtFetcher
org.apache.hadoop.fs.azurebfs.security.AbfssDtFetcher

View File

@ -16,67 +16,780 @@
<!-- MACRO{toc|fromDepth=1|toDepth=3} -->
## Introduction
## <a name="introduction"></a> Introduction
The `hadoop-azure` module provides support for the Azure Data Lake Storage Gen2
storage layer through the "abfs" connector
To make it part of Apache Hadoop's default classpath, simply make sure that
`HADOOP_OPTIONAL_TOOLS` in `hadoop-env.sh` has `hadoop-azure` in the list.
To make it part of Apache Hadoop's default classpath, make sure that
`HADOOP_OPTIONAL_TOOLS` environment variable has `hadoop-azure` in the list,
*on every machine in the cluster*
## Features
```bash
export HADOOP_OPTIONAL_TOOLS=hadoop-azure
```
* Read and write data stored in an Azure Blob Storage account.
You can set this locally in your `.profile`/`.bashrc`, but note it won't
propagate to jobs running in-cluster.
## <a name="features"></a> Features of the ABFS connector.
* Supports reading and writing data stored in an Azure Blob Storage account.
* *Fully Consistent* view of the storage across all clients.
* Can read data written through the wasb: connector.
* Present a hierarchical file system view by implementing the standard Hadoop
* Can read data written through the `wasb:` connector.
* Presents a hierarchical file system view by implementing the standard Hadoop
[`FileSystem`](../api/org/apache/hadoop/fs/FileSystem.html) interface.
* Supports configuration of multiple Azure Blob Storage accounts.
* Can act as a source or destination of data in Hadoop MapReduce, Apache Hive, Apache Spark
* Tested at scale on both Linux and Windows.
* Can act as a source or destination of data in Hadoop MapReduce, Apache Hive, Apache Spark.
* Tested at scale on both Linux and Windows by Microsoft themselves.
* Can be used as a replacement for HDFS on Hadoop clusters deployed in Azure infrastructure.
## Limitations
* File last access time is not tracked.
## Technical notes
### Security
### Consistency and Concurrency
*TODO*: complete/review
The abfs client has a fully consistent view of the store, which has complete Create Read Update and Delete consistency for data and metadata.
(Compare and contrast with S3 which only offers Create consistency; S3Guard adds CRUD to metadata, but not the underlying data).
### Performance
*TODO*: check these.
* File Rename: `O(1)`.
* Directory Rename: `O(files)`.
* Directory Delete: `O(files)`.
## Configuring ABFS
Any configuration can be specified generally (or as the default when accessing all accounts) or can be tied to s a specific account.
For example, an OAuth identity can be configured for use regardless of which account is accessed with the property
"fs.azure.account.oauth2.client.id"
or you can configure an identity to be used only for a specific storage account with
"fs.azure.account.oauth2.client.id.\<account\_name\>.dfs.core.windows.net".
Note that it doesn't make sense to do this with some properties, like shared keys that are inherently account-specific.
## Testing ABFS
See the relevant section in [Testing Azure](testing_azure.html).
## References
For details on ABFS, consult the following documents:
* [A closer look at Azure Data Lake Storage Gen2](https://azure.microsoft.com/en-gb/blog/a-closer-look-at-azure-data-lake-storage-gen2/);
MSDN Article from June 28, 2018.
* [Storage Tiers](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blob-storage-tiers)
## Getting started
### Concepts
The Azure Storage data model presents 3 core concepts:
* **Storage Account**: All access is done through a storage account.
* **Container**: A container is a grouping of multiple blobs. A storage account
may have multiple containers. In Hadoop, an entire file system hierarchy is
stored in a single container.
* **Blob**: A file of any type and size stored with the existing wasb connector
The ABFS connector connects to classic containers, or those created
with Hierarchical Namespaces.
## <a name="namespaces"></a> Hierarchical Namespaces (and WASB Compatibility)
A key aspect of ADLS Gen 2 is its support for
[hierachical namespaces](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-namespace)
These are effectively directories and offer high performance rename and delete operations
—something which makes a significant improvement in performance in query engines
writing data to, including MapReduce, Spark, Hive, as well as DistCp.
This feature is only available if the container was created with "namespace"
support.
You enable namespace support when creating a new Storage Account,
by checking the "Hierarchical Namespace" option in the Portal UI, or, when
creating through the command line, using the option `--hierarchical-namespace true`
_You cannot enable Hierarchical Namespaces on an existing storage account_
Containers in a storage account with Hierarchical Namespaces are
not (currently) readable through the `wasb:` connector.
Some of the `az storage` command line commands fail too, for example:
```bash
$ az storage container list --account-name abfswales1
Blob API is not yet supported for hierarchical namespace accounts. ErrorCode: BlobApiNotYetSupportedForHierarchicalNamespaceAccounts
```
### <a name="creating"></a> Creating an Azure Storage Account
The best documentation on getting started with Azure Datalake Gen2 with the
abfs connector is [Using Azure Data Lake Storage Gen2 with Azure HDInsight clusters](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-use-hdi-cluster)
It includes instructions to create it from [the Azure command line tool](https://docs.microsoft.com/en-us/cli/azure/install-azure-cli?view=azure-cli-latest),
which can be installed on Windows, MacOS (via Homebrew) and Linux (apt or yum).
The [az storage](https://docs.microsoft.com/en-us/cli/azure/storage?view=azure-cli-latest) subcommand
handles all storage commands, [`az storage account create`](https://docs.microsoft.com/en-us/cli/azure/storage/account?view=azure-cli-latest#az-storage-account-create)
does the creation.
Until the ADLS gen2 API support is finalized, you need to add an extension
to the ADLS command.
```bash
az extension add --name storage-preview
```
Check that all is well by verifying that the usage command includes `--hierarchical-namespace`:
```
$ az storage account
usage: az storage account create [-h] [--verbose] [--debug]
[--output {json,jsonc,table,tsv,yaml,none}]
[--query JMESPATH] --resource-group
RESOURCE_GROUP_NAME --name ACCOUNT_NAME
[--sku {Standard_LRS,Standard_GRS,Standard_RAGRS,Standard_ZRS,Premium_LRS,Premium_ZRS}]
[--location LOCATION]
[--kind {Storage,StorageV2,BlobStorage,FileStorage,BlockBlobStorage}]
[--tags [TAGS [TAGS ...]]]
[--custom-domain CUSTOM_DOMAIN]
[--encryption-services {blob,file,table,queue} [{blob,file,table,queue} ...]]
[--access-tier {Hot,Cool}]
[--https-only [{true,false}]]
[--file-aad [{true,false}]]
[--hierarchical-namespace [{true,false}]]
[--bypass {None,Logging,Metrics,AzureServices} [{None,Logging,Metrics,AzureServices} ...]]
[--default-action {Allow,Deny}]
[--assign-identity]
[--subscription _SUBSCRIPTION]
```
You can list locations from `az account list-locations`, which lists the
name to refer to in the `--location` argument:
```
$ az account list-locations -o table
DisplayName Latitude Longitude Name
------------------- ---------- ----------- ------------------
East Asia 22.267 114.188 eastasia
Southeast Asia 1.283 103.833 southeastasia
Central US 41.5908 -93.6208 centralus
East US 37.3719 -79.8164 eastus
East US 2 36.6681 -78.3889 eastus2
West US 37.783 -122.417 westus
North Central US 41.8819 -87.6278 northcentralus
South Central US 29.4167 -98.5 southcentralus
North Europe 53.3478 -6.2597 northeurope
West Europe 52.3667 4.9 westeurope
Japan West 34.6939 135.5022 japanwest
Japan East 35.68 139.77 japaneast
Brazil South -23.55 -46.633 brazilsouth
Australia East -33.86 151.2094 australiaeast
Australia Southeast -37.8136 144.9631 australiasoutheast
South India 12.9822 80.1636 southindia
Central India 18.5822 73.9197 centralindia
West India 19.088 72.868 westindia
Canada Central 43.653 -79.383 canadacentral
Canada East 46.817 -71.217 canadaeast
UK South 50.941 -0.799 uksouth
UK West 53.427 -3.084 ukwest
West Central US 40.890 -110.234 westcentralus
West US 2 47.233 -119.852 westus2
Korea Central 37.5665 126.9780 koreacentral
Korea South 35.1796 129.0756 koreasouth
France Central 46.3772 2.3730 francecentral
France South 43.8345 2.1972 francesouth
Australia Central -35.3075 149.1244 australiacentral
Australia Central 2 -35.3075 149.1244 australiacentral2
```
Once a location has been chosen, create the account
```bash
az storage account create --verbose \
--name abfswales1 \
--resource-group devteam2 \
--kind StorageV2 \
--hierarchical-namespace true \
--location ukwest \
--sku Standard_LRS \
--https-only true \
--encryption-services blob \
--access-tier Hot \
--tags owner=engineering \
--assign-identity \
--output jsonc
```
The output of the command is a JSON file, whose `primaryEndpoints` command
includes the name of the store endpoint:
```json
{
"primaryEndpoints": {
"blob": "https://abfswales1.blob.core.windows.net/",
"dfs": "https://abfswales1.dfs.core.windows.net/",
"file": "https://abfswales1.file.core.windows.net/",
"queue": "https://abfswales1.queue.core.windows.net/",
"table": "https://abfswales1.table.core.windows.net/",
"web": "https://abfswales1.z35.web.core.windows.net/"
}
}
```
The `abfswales1.dfs.core.windows.net` account is the name by which the
storage account will be referred to.
Now ask for the connection string to the store, which contains the account key
```bash
az storage account show-connection-string --name abfswales1
{
"connectionString": "DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName=abfswales1;AccountKey=ZGlkIHlvdSByZWFsbHkgdGhpbmsgSSB3YXMgZ29pbmcgdG8gcHV0IGEga2V5IGluIGhlcmU/IA=="
}
```
You then need to add the access key to your `core-site.xml`, JCEKs file or
use your cluster management tool to set it the option `fs.azure.account.key.STORAGE-ACCOUNT`
to this value.
```XML
<property>
<name>fs.azure.account.key.abfswales1.dfs.core.windows.net</name>
<value>ZGlkIHlvdSByZWFsbHkgdGhpbmsgSSB3YXMgZ29pbmcgdG8gcHV0IGEga2V5IGluIGhlcmU/IA==</value>
</property>
```
#### Creation through the Azure Portal
Creation through the portal is covered in [Quickstart: Create an Azure Data Lake Storage Gen2 storage account](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-quickstart-create-account)
Key Steps
1. Create a new Storage Account in a location which suits you.
1. "Basics" Tab: select "StorageV2".
1. "Advanced" Tab: enable "Hierarchical Namespace".
You have now created your storage account. Next, get the key for authentication
for using the default "Shared Key" authentication.
1. Go to the Azure Portal.
1. Select "Storage Accounts"
1. Select the newly created storage account.
1. In the list of settings, locate "Access Keys" and select that.
1. Copy one of the access keys to the clipboard, add to the XML option,
set in cluster management tools, Hadoop JCEKS file or KMS store.
### <a name="new_container"></a> Creating a new container
An Azure storage account can have multiple containers, each with the container
name as the userinfo field of the URI used to reference it.
For example, the container "container1" in the storage account just created
will have the URL `abfs://container1@abfswales1.dfs.core.windows.net/`
You can create a new container through the ABFS connector, by setting the option
`fs.azure.createRemoteFileSystemDuringInitialization` to `true`.
If the container does not exist, an attempt to list it with `hadoop fs -ls`
will fail
```
$ hadoop fs -ls abfs://container1@abfswales1.dfs.core.windows.net/
ls: `abfs://container1@abfswales1.dfs.core.windows.net/': No such file or directory
```
Enable remote FS creation and the second attempt succeeds, creating the container as it does so:
```
$ hadoop fs -D fs.azure.createRemoteFileSystemDuringInitialization=true \
-ls abfs://container1@abfswales1.dfs.core.windows.net/
```
This is useful for creating accounts on the command line, especially before
the `az storage` command supports hierarchical namespaces completely.
### Listing and examining containers of a Storage Account.
You can use the [Azure Storage Explorer](https://azure.microsoft.com/en-us/features/storage-explorer/)
## <a name="configuring"></a> Configuring ABFS
Any configuration can be specified generally (or as the default when accessing all accounts)
or can be tied to a specific account.
For example, an OAuth identity can be configured for use regardless of which
account is accessed with the property `fs.azure.account.oauth2.client.id`
or you can configure an identity to be used only for a specific storage account with
`fs.azure.account.oauth2.client.id.<account_name>.dfs.core.windows.net`.
This is shown in the Authentication section.
## <a name="authentication"></a> Authentication
Authentication for ABFS is ultimately granted by [Azure Active Directory](https://docs.microsoft.com/en-us/azure/active-directory/develop/authentication-scenarios).
The concepts covered there are beyond the scope of this document to cover;
developers are expected to have read and understood the concepts therein
to take advantage of the different authentication mechanisms.
What is covered here, briefly, is how to configure the ABFS client to authenticate
in different deployment situations.
The ABFS client can be deployed in different ways, with its authentication needs
driven by them.
1. With the storage account's authentication secret in the configuration:
"Shared Key".
1. Using OAuth 2.0 tokens of one form or another.
1. Deployed in-Azure with the Azure VMs providing OAuth 2.0 tokens to the application,
"Managed Instance".
What can be changed is what secrets/credentials are used to authenticate the caller.
The authentication mechanism is set in `fs.azure.account.auth.type` (or the account specific variant),
and, for the various OAuth options `fs.azure.account.oauth.provider.type`
All secrets can be stored in JCEKS files. These are encrypted and password
protected —use them or a compatible Hadoop Key Management Store wherever
possible
### <a name="shared-key-auth"></a> Default: Shared Key
This is the simplest authentication mechanism of account + password.
The account name is inferred from the URL;
the password, "key", retrieved from the XML/JCECKs configuration files.
```xml
<property>
<name>fs.azure.account.auth.type.abfswales1.dfs.core.windows.net</name>
<value>SharedKey</value>
<description>
</description>
</property>
<property>
<name>fs.azure.account.key.abfswales1.dfs.core.windows.net</name>
<value>ZGlkIHlvdSByZWFsbHkgdGhpbmsgSSB3YXMgZ29pbmcgdG8gcHV0IGEga2V5IGluIGhlcmU/IA==</value>
<description>
The secret password. Never share these.
</description>
</property>
```
*Note*: The source of the account key can be changed through a custom key provider;
one exists to execute a shell script to retrieve it.
### <a name="oauth-client-credentials"></a> OAuth 2.0 Client Credentials
OAuth 2.0 credentials of (client id, client secret, endpoint) are provided in the configuration/JCEKS file.
The specifics of this process is covered
in [hadoop-azure-datalake](../hadoop-azure-datalake/index.html#Configuring_Credentials_and_FileSystem);
the key names are slightly different here.
```xml
<property>
<name>fs.azure.account.auth.type</name>
<value>OAuth</value>
<description>
Use OAuth authentication
</description>
</property>
<property>
<name>fs.azure.account.oauth.provider.type</name>
<value>org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider</value>
<description>
Use client credentials
</description>
</property>
<property>
<name>fs.azure.account.oauth2.client.endpoint</name>
<value></value>
<description>
URL of OAuth endpoint
</description>
</property>
<property>
<name>fs.azure.account.oauth2.client.id</name>
<value></value>
<description>
Client ID
</description>
</property>
<property>
<name>fs.azure.account.oauth2.client.secret</name>
<value></value>
<description>
Secret
</description>
</property>
```
### <a name="oauth-user-and-passwd"></a> OAuth 2.0: Username and Password
An OAuth 2.0 endpoint, username and password are provided in the configuration/JCEKS file.
```xml
<property>
<name>fs.azure.account.auth.type</name>
<value>OAuth</value>
<description>
Use OAuth authentication
</description>
</property>
<property>
<name>fs.azure.account.oauth.provider.type</name>
<value>org.apache.hadoop.fs.azurebfs.oauth2.UserPasswordTokenProvider</value>
<description>
Use user and password
</description>
</property>
<property>
<name>fs.azure.account.oauth2.client.endpoint</name>
<value></value>
<description>
URL of OAuth 2.0 endpoint
</description>
</property>
<property>
<name>fs.azure.account.oauth2.user.name</name>
<value></value>
<description>
username
</description>
</property>
<property>
<name>fs.azure.account.oauth2.user.password</name>
<value></value>
<description>
password for account
</description>
</property>
```
### <a name="oauth-refresh-token"></a> OAuth 2.0: Refresh Token
With an existing Oauth 2.0 token, make a request of the Active Directory endpoint
`https://login.microsoftonline.com/Common/oauth2/token` for this token to be refreshed.
```xml
<property>
<name>fs.azure.account.auth.type</name>
<value>OAuth</value>
<description>
Use OAuth 2.0 authentication
</description>
</property>
<property>
<name>fs.azure.account.oauth.provider.type</name>
<value>org.apache.hadoop.fs.azurebfs.oauth2.RefreshTokenBasedTokenProvider</value>
<description>
Use the Refresh Token Provider
</description>
</property>
<property>
<name>fs.azure.account.oauth2.refresh.token</name>
<value></value>
<description>
Refresh token
</description>
</property>
<property>
<name>fs.azure.account.oauth2.client.id</name>
<value></value>
<description>
Optional Client ID
</description>
</property>
```
### <a name="managed-identity"></a> Azure Managed Identity
[Azure Managed Identities](https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview), formerly "Managed Service Identities".
OAuth 2.0 tokens are issued by a special endpoint only accessible
from the executing VM (`http://169.254.169.254/metadata/identity/oauth2/token`).
The issued credentials can be used to authenticate.
The Azure Portal/CLI is used to create the service identity.
```xml
<property>
<name>fs.azure.account.auth.type</name>
<value>OAuth</value>
<description>
Use OAuth authentication
</description>
</property>
<property>
<name>fs.azure.account.oauth.provider.type</name>
<value>org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider</value>
<description>
Use MSI for issuing OAuth tokens
</description>
</property>
<property>
<name>fs.azure.account.oauth2.msi.tenant</name>
<value></value>
<description>
Optional MSI Tenant ID
</description>
</property>
<property>
<name>fs.azure.account.oauth2.client.id</name>
<value></value>
<description>
Optional Client ID
</description>
</property>
```
### Custom OAuth 2.0 Token Provider
A Custom OAuth 2.0 token provider supplies the ABFS connector with an OAuth 2.0
token when its `getAccessToken()` method is invoked.
```xml
<property>
<name>fs.azure.account.auth.type</name>
<value>Custom</value>
<description>
Custom Authentication
</description>
</property>
<property>
<name>fs.azure.account.oauth.provider.type</name>
<value></value>
<description>
classname of Custom Authentication Provider
</description>
</property>
```
The declared class must implement `org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee`
and optionally `org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension`.
## <a name="technical"></a> Technical notes
### <a name="proxy"></a> Proxy setup
The connector uses the JVM proxy settings to control its proxy setup.
See The [Oracle Java documentation](https://docs.oracle.com/javase/8/docs/technotes/guides/net/proxies.html) for the options to set.
As the connector uses HTTPS by default, the `https.proxyHost` and `https.proxyPort`
options are those which must be configured.
In MapReduce jobs, including distcp, the proxy options must be set in both the
`mapreduce.map.java.opts` and `mapreduce.reduce.java.opts`.
```bash
# this variable is only here to avoid typing the same values twice.
# It's name is not important.
export DISTCP_PROXY_OPTS="-Dhttps.proxyHost=web-proxy.example.com -Dhttps.proxyPort=80"
hadoop distcp \
-D mapreduce.map.java.opts="$DISTCP_PROXY_OPTS" \
-D mapreduce.reduce.java.opts="$DISTCP_PROXY_OPTS" \
-update -skipcrccheck -numListstatusThreads 40 \
hdfs://namenode:8020/users/alice abfs://backups@account.dfs.core.windows.net/users/alice
```
Without these settings, even though access to ADLS may work from the command line,
`distcp` access can fail with network errors.
### <a name="security"></a> Security
As with other object stores, login secrets are valuable pieces of information.
Organizations should have a process for safely sharing them.
### <a name="limitations"></a> Limitations of the ABFS connector
* File last access time is not tracked.
* Extended attributes are not supported.
* File Checksums are not supported.
* The `Syncable` interfaces `hsync()` and `hflush()` operations are supported if
`fs.azure.enable.flush` is set to true (default=true). With the Wasb connector,
this limited the number of times either call could be made to 50,000
[HADOOP-15478](https://issues.apache.org/jira/browse/HADOOP-15478).
If abfs has the a similar limit, then excessive use of sync/flush may
cause problems.
### <a name="consistency"></a> Consistency and Concurrency
As with all Azure storage services, the Azure Datalake Gen 2 store offers
a fully consistent view of the store, with complete
Create, Read, Update, and Delete consistency for data and metadata.
(Compare and contrast with S3 which only offers Create consistency;
S3Guard adds CRUD to metadata, but not the underlying data).
### <a name="performance"></a> Performance and Scalability
For containers with hierarchical namespaces,
the scalability numbers are, in Big-O-notation, as follows:
| Operation | Scalability |
|-----------|-------------|
| File Rename | `O(1)` |
| File Delete | `O(1)` |
| Directory Rename:| `O(1)` |
| Directory Delete | `O(1)` |
For non-namespace stores, the scalability becomes:
| Operation | Scalability |
|-----------|-------------|
| File Rename | `O(1)` |
| File Delete | `O(1)` |
| Directory Rename:| `O(files)` |
| Directory Delete | `O(files)` |
That is: the more files there are, the slower directory operations get.
Further reading: [Azure Storage Scalability Targets](https://docs.microsoft.com/en-us/azure/storage/common/storage-scalability-targets?toc=%2fazure%2fstorage%2fqueues%2ftoc.json)
### <a name="extensibility"></a> Extensibility
The ABFS connector supports a number of limited-private/unstable extension
points for third-parties to integrate their authentication and authorization
services into the ABFS client.
* `CustomDelegationTokenManager` : adds ability to issue Hadoop Delegation Tokens.
* `AbfsAuthorizer` permits client-side authorization of file operations.
* `CustomTokenProviderAdaptee`: allows for custom provision of
Azure OAuth tokens.
* `KeyProvider`.
Consult the source in `org.apache.hadoop.fs.azurebfs.extensions`
and all associated tests to see how to make use of these extension points.
_Warning_ These extension points are unstable.
## <a href="options"></a> Other configuration options
Consult the javadocs for `org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys`,
`org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations` and
`org.apache.hadoop.fs.azurebfs.AbfsConfiguration` for the full list
of configuration options and their default values.
## <a name="troubleshooting"></a> Troubleshooting
The problems associated with the connector usually come down to, in order
1. Classpath.
1. Network setup (proxy etc.).
1. Authentication and Authorization.
1. Anything else.
If you log `org.apache.hadoop.fs.azurebfs.services` at `DEBUG` then you will
see more details about any request which is failing.
One useful tool for debugging connectivity is the [cloudstore storediag utility](https://github.com/steveloughran/cloudstore/releases).
This validates the classpath, the settings, then tries to work with the filesystem.
```bash
bin/hadoop jar cloudstore-0.1-SNAPSHOT.jar storediag abfs://container@account.dfs.core.windows.net/
```
1. If the `storediag` command cannot work with an abfs store, nothing else is likely to.
1. If the `storediag` store does successfully work, that does not guarantee that the classpath
or configuration on the rest of the cluster is also going to work, especially
in distributed applications. But it is at least a start.
### `ClassNotFoundException: org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem`
The `hadoop-azure` JAR is not on the classpah.
```
java.lang.RuntimeException: java.lang.ClassNotFoundException:
Class org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2625)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3290)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3322)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:136)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3373)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3341)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:491)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
Caused by: java.lang.ClassNotFoundException:
Class org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2529)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2623)
... 16 more
```
Tip: if this is happening on the command line, you can turn on debug logging
of the hadoop scripts:
```bash
export HADOOP_SHELL_SCRIPT_DEBUG=true
```
If this is happening on an application running within the cluster, it means
the cluster (somehow) needs to be configured so that the `hadoop-azure`
module and dependencies are on the classpath of deployed applications.
### `ClassNotFoundException: com.microsoft.azure.storage.StorageErrorCode`
The `azure-storage` JAR is not on the classpath.
### `Server failed to authenticate the request`
The request wasn't authenticated while using the default shared-key
authentication mechanism.
```
Operation failed: "Server failed to authenticate the request.
Make sure the value of Authorization header is formed correctly including the signature.",
403, HEAD, https://account.dfs.core.windows.net/container2?resource=filesystem&timeout=90
at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:135)
at org.apache.hadoop.fs.azurebfs.services.AbfsClient.getFilesystemProperties(AbfsClient.java:209)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getFilesystemProperties(AzureBlobFileSystemStore.java:259)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.fileSystemExists(AzureBlobFileSystem.java:859)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:110)
```
Causes include:
* Your credentials are incorrect.
* Your shared secret has expired. in Azure, this happens automatically
* Your shared secret has been revoked.
* host/VM clock drift means that your client's clock is out of sync with the
Azure servers —the call is being rejected as it is either out of date (considered a replay)
or from the future. Fix: Check your clocks, etc.
### `Configuration property _something_.dfs.core.windows.net not found`
There's no `fs.azure.account.key.` entry in your cluster configuration declaring the
access key for the specific account, or you are using the wrong URL
```
$ hadoop fs -ls abfs://container@abfswales2.dfs.core.windows.net/
ls: Configuration property abfswales2.dfs.core.windows.net not found.
```
* Make sure that the URL is correct
* Add the missing account key.
### `No such file or directory when trying to list a container`
There is no container of the given name. Either it has been mistyped
or the container needs to be created.
```
$ hadoop fs -ls abfs://container@abfswales1.dfs.core.windows.net/
ls: `abfs://container@abfswales1.dfs.core.windows.net/': No such file or directory
```
* Make sure that the URL is correct
* Create the container if needed
### "HTTP connection to https://login.microsoftonline.com/_something_ failed for getting token from AzureAD. Http response: 200 OK"
+ it has a content-type `text/html`, `text/plain`, `application/xml`
The OAuth authentication page didn't fail with an HTTP error code, but it didn't return JSON either
```
$ bin/hadoop fs -ls abfs://container@abfswales1.dfs.core.windows.net/
...
ls: HTTP Error 200;
url='https://login.microsoftonline.com/02a07549-0a5f-4c91-9d76-53d172a638a2/oauth2/authorize'
AADToken: HTTP connection to
https://login.microsoftonline.com/02a07549-0a5f-4c91-9d76-53d172a638a2/oauth2/authorize
failed for getting token from AzureAD.
Unexpected response.
Check configuration, URLs and proxy settings.
proxies=none;
requestId='dd9d526c-8b3d-4b3f-a193-0cf021938600';
contentType='text/html; charset=utf-8';
```
Likely causes are configuration and networking:
1. Authentication is failing, the caller is being served up the Azure Active Directory
signon page for humans, even though it is a machine calling.
1. The URL is wrong —it is pointing at a web page unrelated to OAuth2.0
1. There's a proxy server in the way trying to return helpful instructions.
## <a name="testing"></a> Testing ABFS
See the relevant section in [Testing Azure](testing_azure.html).

View File

@ -16,17 +16,26 @@
<!-- MACRO{toc|fromDepth=1|toDepth=3} -->
See also:
* [ABFS](./abfs.html)
* [Testing](./testing_azure.html)
## Introduction
The hadoop-azure module provides support for integration with
The `hadoop-azure` module provides support for integration with
[Azure Blob Storage](http://azure.microsoft.com/en-us/documentation/services/storage/).
The built jar file, named hadoop-azure.jar, also declares transitive dependencies
The built jar file, named `hadoop-azure.jar`, also declares transitive dependencies
on the additional artifacts it requires, notably the
[Azure Storage SDK for Java](https://github.com/Azure/azure-storage-java).
To make it part of Apache Hadoop's default classpath, simply make sure that
HADOOP_OPTIONAL_TOOLS in hadoop-env.sh has 'hadoop-azure' in the list.
`HADOOP_OPTIONAL_TOOLS`in `hadoop-env.sh` has `'hadoop-azure` in the list.
Example:
```bash
export HADOOP_OPTIONAL_TOOLS="hadoop-azure,hadoop-azure-datalake"
```
## Features
* Read and write data stored in an Azure Blob Storage account.

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
@ -342,4 +343,13 @@ protected Path path(String filepath) throws IOException {
new Path(getTestPath(), filepath));
}
/**
* Get any Delegation Token manager created by the filesystem.
* @return the DT manager or null.
* @throws IOException failure
*/
protected AbfsDelegationTokenManager getDelegationTokenManager()
throws IOException {
return getFileSystem().getDelegationTokenManager();
}
}

View File

@ -62,6 +62,7 @@ public class ITestAbfsIdentityTransformer extends AbstractAbfsScaleTest{
public ITestAbfsIdentityTransformer() throws Exception {
super();
UserGroupInformation.reset();
userGroupInfo = UserGroupInformation.getCurrentUser();
localUser = userGroupInfo.getShortUserName();
localGroup = userGroupInfo.getPrimaryGroupName();

View File

@ -0,0 +1,282 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.extensions;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.Charset;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_DELEGATION_TOKEN_PROVIDER_TYPE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_DELEGATION_TOKEN;
import static org.apache.hadoop.fs.azurebfs.extensions.KerberizedAbfsCluster.newURI;
import static org.apache.hadoop.fs.azurebfs.extensions.StubAbfsTokenIdentifier.decodeIdentifier;
/**
* This is a Stub DT manager for testing, one which
* implements the the {@link CustomDelegationTokenManager} API, but
* not the extended one.
*
* Member variables are updated as operations are performed, so
* test cases can make assertions about the state of the plugin.
*/
public class ClassicDelegationTokenManager
implements CustomDelegationTokenManager {
private static final Logger LOG = LoggerFactory.getLogger(
ClassicDelegationTokenManager.class);
/**
* Classname.
*/
public static final String NAME
= "org.apache.hadoop.fs.azurebfs.extensions.ClassicDelegationTokenManager";
/**
* If this the DT is unbound, this is used for the service kind.
*/
public static final String UNSET = "abfs://user@unset.dfs.core.windows.net/";
/**
* The URI used when creating a token for an unset binding.
*/
public static final URI UNSET_URI = newURI(UNSET);
private URI fsURI;
private boolean initialized;
private boolean closed;
private int renewals;
private int cancellations;
private int issued;
private Text kind;
private UserGroupInformation owner;
private String canonicalServiceName;
/**
* Instantiate.
*/
public ClassicDelegationTokenManager() {
}
@Override
public void initialize(final Configuration configuration) throws IOException {
initialized = true;
owner = UserGroupInformation.getCurrentUser();
LOG.info("Creating Stub DT manager for {}", owner.getUserName());
}
public void close() {
closed = true;
}
@Override
public Token<DelegationTokenIdentifier> getDelegationToken(final String renewer)
throws IOException {
// guarantees issued
issued++;
URI uri = fsURI != null ? fsURI : UNSET_URI;
Text renewerT = new Text(renewer != null ? renewer : "");
Token t = createToken(issued, uri, new Text(owner.getUserName()),
renewerT);
if (kind != null) {
t.setKind(kind);
}
t.setService(createServiceText());
LOG.info("Created token {}", t);
return t;
}
public Text createServiceText() {
return new Text(fsURI != null ? fsURI.toString() : UNSET);
}
/**
* Create a token.
*
* @param sequenceNumber sequence number.
* @param uri FS URI
* @param owner FS owner
* @param renewer renewer
* @return a token.
*/
public static Token<DelegationTokenIdentifier> createToken(
final int sequenceNumber,
final URI uri,
final Text owner,
final Text renewer) {
StubAbfsTokenIdentifier id
= new StubAbfsTokenIdentifier(uri, owner, renewer);
id.setSequenceNumber(sequenceNumber);
Token<DelegationTokenIdentifier> token = new Token(
id,
new TokenSecretManager());
return token;
}
@Override
public long renewDelegationToken(final Token<?> token) throws IOException {
renewals++;
decodeIdentifier(token);
return 0;
}
@Override
public void cancelDelegationToken(final Token<?> token) throws IOException {
cancellations++;
decodeIdentifier(token);
}
protected void innerBind(final URI uri, final Configuration conf)
throws IOException {
Preconditions.checkState(initialized, "Not initialized");
Preconditions.checkState(fsURI == null, "already bound");
fsURI = uri;
canonicalServiceName = uri.toString();
LOG.info("Bound to {}", fsURI);
}
public String getCanonicalServiceName() {
return canonicalServiceName;
}
public void setCanonicalServiceName(final String canonicalServiceName) {
this.canonicalServiceName = canonicalServiceName;
}
public URI getFsURI() {
return fsURI;
}
public boolean isInitialized() {
return initialized;
}
public boolean isBound() {
return fsURI != null;
}
public boolean isClosed() {
return closed;
}
public int getRenewals() {
return renewals;
}
public int getCancellations() {
return cancellations;
}
public int getIssued() {
return issued;
}
public Text getKind() {
return kind;
}
public void setKind(final Text kind) {
this.kind = kind;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"StubDelegationTokenManager{");
sb.append("fsURI=").append(fsURI);
sb.append(", initialized=").append(initialized);
sb.append(", closed=").append(closed);
sb.append(", renewals=").append(renewals);
sb.append(", cancellations=").append(cancellations);
sb.append(", issued=").append(issued);
sb.append('}');
return sb.toString();
}
/**
* Patch a configuration to declare this the DT provider for a filesystem
* built off the given configuration.
* The ABFS Filesystem still needs to come up with security enabled.
* @param conf configuration.
* @return the patched configuration.
*/
public static Configuration useClassicDTManager(Configuration conf) {
conf.setBoolean(FS_AZURE_ENABLE_DELEGATION_TOKEN, true);
conf.set(FS_AZURE_DELEGATION_TOKEN_PROVIDER_TYPE,
ClassicDelegationTokenManager.NAME);
return conf;
}
/**
* Get the password to use in secret managers.
* This is a constant; its just recalculated every time to stop findbugs
* highlighting security risks of shared mutable byte arrays.
* @return a password.
*/
private static byte[] getSecretManagerPasssword() {
return "non-password".getBytes(Charset.forName("UTF-8"));
}
/**
* The secret manager always uses the same secret; the
* factory for new identifiers is that of the token manager.
*/
protected static class TokenSecretManager
extends SecretManager<StubAbfsTokenIdentifier> {
public TokenSecretManager() {
}
@Override
protected byte[] createPassword(StubAbfsTokenIdentifier identifier) {
return getSecretManagerPasssword();
}
@Override
public byte[] retrievePassword(StubAbfsTokenIdentifier identifier)
throws InvalidToken {
return getSecretManagerPasssword();
}
@Override
public StubAbfsTokenIdentifier createIdentifier() {
return new StubAbfsTokenIdentifier();
}
}
}

View File

@ -0,0 +1,370 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.extensions;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.DtUtilShell;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP;
import static org.apache.hadoop.test.LambdaTestUtils.doAs;
/**
* Test custom DT support in ABFS.
* This brings up a mini KDC in class setup/teardown, as the FS checks
* for that when it enables security.
*
* Much of this code is copied from
* {@code org.apache.hadoop.fs.s3a.auth.delegation.AbstractDelegationIT}
*/
public class ITestAbfsDelegationTokens extends AbstractAbfsIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(
ITestAbfsDelegationTokens.class);
/**
* Created in static {@link #setupCluster()} call.
*/
@SuppressWarnings("StaticNonFinalField")
private static KerberizedAbfsCluster cluster;
private UserGroupInformation aliceUser;
/***
* Set up the clusters.
*/
@BeforeClass
public static void setupCluster() throws Exception {
resetUGI();
cluster = new KerberizedAbfsCluster();
cluster.init(new Configuration());
cluster.start();
}
/**
* Tear down the Cluster.
*/
@SuppressWarnings("ThrowableNotThrown")
@AfterClass
public static void teardownCluster() throws Exception {
resetUGI();
ServiceOperations.stopQuietly(LOG, cluster);
}
public ITestAbfsDelegationTokens() throws Exception {
}
@Override
public void setup() throws Exception {
// create the FS
Configuration conf = getRawConfiguration();
cluster.bindConfToCluster(conf);
conf.setBoolean(HADOOP_SECURITY_TOKEN_SERVICE_USE_IP,
false);
resetUGI();
UserGroupInformation.setConfiguration(conf);
aliceUser = cluster.createAliceUser();
assertSecurityEnabled();
// log in as alice so that filesystems belong to that user
UserGroupInformation.setLoginUser(aliceUser);
StubDelegationTokenManager.useStubDTManager(conf);
FileSystem.closeAllForUGI(UserGroupInformation.getLoginUser());
super.setup();
assertNotNull("No StubDelegationTokenManager created in filesystem init",
getStubDTManager());
}
protected StubDelegationTokenManager getStubDTManager() throws IOException {
return (StubDelegationTokenManager) getDelegationTokenManager().getTokenManager();
}
/**
* Cleanup removes cached filesystems and the last instance of the
* StubDT manager.
*/
@Override
public void teardown() throws Exception {
// clean up all of alice's instances.
FileSystem.closeAllForUGI(UserGroupInformation.getLoginUser());
super.teardown();
}
/**
* General assertion that security is turred on for a cluster.
*/
public static void assertSecurityEnabled() {
assertTrue("Security is needed for this test",
UserGroupInformation.isSecurityEnabled());
}
/**
* Reset UGI info.
*/
protected static void resetUGI() {
UserGroupInformation.reset();
}
/**
* Create credentials with the DTs of the given FS.
* @param fs filesystem
* @return a non-empty set of credentials.
* @throws IOException failure to create.
*/
protected static Credentials mkTokens(final FileSystem fs)
throws IOException {
Credentials cred = new Credentials();
fs.addDelegationTokens("rm/rm1@EXAMPLE.COM", cred);
return cred;
}
@Test
public void testTokenManagerBinding() throws Throwable {
StubDelegationTokenManager instance
= getStubDTManager();
assertNotNull("No StubDelegationTokenManager created in filesystem init",
instance);
assertTrue("token manager not initialized: " + instance,
instance.isInitialized());
}
/**
* When bound to a custom DT manager, it provides the service name.
* The stub returns the URI by default.
*/
@Test
public void testCanonicalization() throws Throwable {
String service = getCanonicalServiceName();
assertNotNull("No canonical service name from filesystem " + getFileSystem(),
service);
assertEquals("canonical URI and service name mismatch",
getFilesystemURI(), new URI(service));
}
protected URI getFilesystemURI() throws IOException {
return getFileSystem().getUri();
}
protected String getCanonicalServiceName() throws IOException {
return getFileSystem().getCanonicalServiceName();
}
/**
* Checks here to catch any regressions in canonicalization
* logic.
*/
@Test
public void testDefaultCanonicalization() throws Throwable {
FileSystem fs = getFileSystem();
clearTokenServiceName();
assertEquals("canonicalServiceName is not the default",
getDefaultServiceName(fs), getCanonicalServiceName());
}
protected String getDefaultServiceName(final FileSystem fs) {
return SecurityUtil.buildDTServiceName(fs.getUri(), 0);
}
protected void clearTokenServiceName() throws IOException {
getStubDTManager().setCanonicalServiceName(null);
}
/**
* Request a token; this tests the collection workflow.
*/
@Test
public void testRequestToken() throws Throwable {
AzureBlobFileSystem fs = getFileSystem();
Credentials credentials = mkTokens(fs);
assertEquals("Number of collected tokens", 1,
credentials.numberOfTokens());
verifyCredentialsContainsToken(credentials, fs);
}
/**
* Request a token; this tests the collection workflow.
*/
@Test
public void testRequestTokenDefault() throws Throwable {
clearTokenServiceName();
AzureBlobFileSystem fs = getFileSystem();
assertEquals("canonicalServiceName is not the default",
getDefaultServiceName(fs), fs.getCanonicalServiceName());
Credentials credentials = mkTokens(fs);
assertEquals("Number of collected tokens", 1,
credentials.numberOfTokens());
verifyCredentialsContainsToken(credentials,
getDefaultServiceName(fs), getFilesystemURI().toString());
}
public void verifyCredentialsContainsToken(final Credentials credentials,
FileSystem fs) throws IOException {
verifyCredentialsContainsToken(credentials,
fs.getCanonicalServiceName(),
fs.getUri().toString());
}
/**
* Verify that the set of credentials contains a token for the given
* canonical service name, and that it is of the given kind.
* @param credentials set of credentials
* @param serviceName canonical service name for lookup.
* @param tokenService service kind; also expected in string value.
* @return the retrieved token.
* @throws IOException IO failure
*/
public StubAbfsTokenIdentifier verifyCredentialsContainsToken(
final Credentials credentials,
final String serviceName,
final String tokenService) throws IOException {
Token<? extends TokenIdentifier> token = credentials.getToken(
new Text(serviceName));
assertEquals("Token Kind in " + token,
StubAbfsTokenIdentifier.TOKEN_KIND, token.getKind());
assertEquals("Token Service Kind in " + token,
tokenService, token.getService().toString());
StubAbfsTokenIdentifier abfsId = (StubAbfsTokenIdentifier)
token.decodeIdentifier();
LOG.info("Created token {}", abfsId);
assertEquals("token URI in " + abfsId,
tokenService, abfsId.getUri().toString());
return abfsId;
}
/**
* This mimics the DT collection performed inside FileInputFormat to
* collect DTs for a job.
* @throws Throwable on failure.
*/
@Test
public void testJobsCollectTokens() throws Throwable {
// get tokens for all the required FileSystems..
AzureBlobFileSystem fs = getFileSystem();
Credentials credentials = new Credentials();
Path root = fs.makeQualified(new Path("/"));
Path[] paths = {root};
Configuration conf = fs.getConf();
TokenCache.obtainTokensForNamenodes(credentials,
paths,
conf);
verifyCredentialsContainsToken(credentials, fs);
}
/**
* Run the DT Util command.
* @param expected expected outcome
* @param conf configuration for the command (hence: FS to create)
* @param args other arguments
* @return the output of the command.
*/
protected String dtutil(final int expected,
final Configuration conf,
final String... args) throws Exception {
final ByteArrayOutputStream dtUtilContent = new ByteArrayOutputStream();
DtUtilShell dt = new DtUtilShell();
dt.setOut(new PrintStream(dtUtilContent));
dtUtilContent.reset();
int r = doAs(aliceUser,
() -> ToolRunner.run(conf, dt, args));
String s = dtUtilContent.toString();
LOG.info("\n{}", s);
assertEquals("Exit code from command dtutil "
+ StringUtils.join(" ", args) + " with output " + s,
expected, r);
return s;
}
/**
* Verify the dtutil shell command can fetch tokens
*/
@Test
public void testDTUtilShell() throws Throwable {
File tokenfile = cluster.createTempTokenFile();
String tfs = tokenfile.toString();
String fsURI = getFileSystem().getUri().toString();
dtutil(0, getRawConfiguration(),
"get", fsURI,
"-format", "protobuf",
tfs);
assertTrue("not created: " + tokenfile,
tokenfile.exists());
assertTrue("File is empty " + tokenfile,
tokenfile.length() > 0);
assertTrue("File only contains header " + tokenfile,
tokenfile.length() > 6);
String printed = dtutil(0, getRawConfiguration(), "print", tfs);
assertTrue("no " + fsURI + " in " + printed,
printed.contains(fsURI));
assertTrue("no " + StubAbfsTokenIdentifier.ID + " in " + printed,
printed.contains(StubAbfsTokenIdentifier.ID));
}
/**
* Creates a new FS instance with the simplest binding lifecycle;
* get a token.
* This verifies the classic binding mechanism works.
*/
@Test
public void testBaseDTLifecycle() throws Throwable {
Configuration conf = new Configuration(getRawConfiguration());
ClassicDelegationTokenManager.useClassicDTManager(conf);
try (FileSystem fs = FileSystem.newInstance(getFilesystemURI(), conf)) {
Credentials credentials = mkTokens(fs);
assertEquals("Number of collected tokens", 1,
credentials.numberOfTokens());
verifyCredentialsContainsToken(credentials,
fs.getCanonicalServiceName(),
ClassicDelegationTokenManager.UNSET);
}
}
}

View File

@ -0,0 +1,318 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.extensions;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.KDiag;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytabAndReturnUGI;
import static org.junit.Assert.assertTrue;
/**
* composite service for adding kerberos login for ABFS
* tests which require a logged in user.
* Based on
* {@code org.apache.hadoop.fs.s3a.auth.delegation.MiniKerberizedHadoopCluster}
*/
public class KerberizedAbfsCluster extends CompositeService {
private static final Logger LOG =
LoggerFactory.getLogger(KerberizedAbfsCluster.class);
public static final String ALICE = "alice";
public static final String BOB = "bob";
public static final String HTTP_LOCALHOST = "HTTP/localhost@$LOCALHOST";
/**
* The hostname is dynamically determined based on OS, either
* "localhost" (non-windows) or 127.0.0.1 (windows).
*/
public static final String LOCALHOST_NAME = Path.WINDOWS
? "127.0.0.1"
: "localhost";
private MiniKdc kdc;
private File keytab;
private File workDir;
private String krbInstance;
private String loginUsername;
private String loginPrincipal;
private String sslConfDir;
private String clientSSLConfigFileName;
private String serverSSLConfigFileName;
private String alicePrincipal;
private String bobPrincipal;
/**
* Create the cluster.
* If this class's log is at DEBUG level, this also turns
* Kerberos diagnostics on in the JVM.
*/
public KerberizedAbfsCluster() {
super("KerberizedAbfsCluster");
// load all the configs to force in the -default.xml files
new JobConf();
if (LOG.isDebugEnabled()) {
// turn on kerberos logging @ debug.
System.setProperty(KDiag.SUN_SECURITY_KRB5_DEBUG, "true");
System.setProperty(KDiag.SUN_SECURITY_SPNEGO_DEBUG, "true");
}
}
public MiniKdc getKdc() {
return kdc;
}
public File getKeytab() {
return keytab;
}
public String getKeytabPath() {
return keytab.getAbsolutePath();
}
public UserGroupInformation createBobUser() throws IOException {
return loginUserFromKeytabAndReturnUGI(bobPrincipal,
keytab.getAbsolutePath());
}
public UserGroupInformation createAliceUser() throws IOException {
return loginUserFromKeytabAndReturnUGI(alicePrincipal,
keytab.getAbsolutePath());
}
public File getWorkDir() {
return workDir;
}
public String getKrbInstance() {
return krbInstance;
}
public String getLoginUsername() {
return loginUsername;
}
public String getLoginPrincipal() {
return loginPrincipal;
}
public String withRealm(String user) {
return user + "@EXAMPLE.COM";
}
/**
* Service init creates the KDC.
* @param conf configuration
*/
@Override
protected void serviceInit(final Configuration conf) throws Exception {
patchConfigAtInit(conf);
super.serviceInit(conf);
Properties kdcConf = MiniKdc.createConf();
workDir = GenericTestUtils.getTestDir("kerberos");
workDir.mkdirs();
kdc = new MiniKdc(kdcConf, workDir);
krbInstance = LOCALHOST_NAME;
}
/**
* Start the KDC, create the keytab and the alice and bob users,
* and UGI instances of them logged in from the keytab.
*/
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
kdc.start();
keytab = new File(workDir, "keytab.bin");
loginUsername = UserGroupInformation.getLoginUser().getShortUserName();
loginPrincipal = loginUsername + "/" + krbInstance;
alicePrincipal = ALICE + "/" + krbInstance;
bobPrincipal = BOB + "/" + krbInstance;
kdc.createPrincipal(keytab,
alicePrincipal,
bobPrincipal,
"HTTP/" + krbInstance,
HTTP_LOCALHOST,
loginPrincipal);
final File keystoresDir = new File(workDir, "ssl");
keystoresDir.mkdirs();
sslConfDir = KeyStoreTestUtil.getClasspathDir(
this.getClass());
KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(),
sslConfDir, getConfig(), false);
clientSSLConfigFileName = KeyStoreTestUtil.getClientSSLConfigFileName();
serverSSLConfigFileName = KeyStoreTestUtil.getServerSSLConfigFileName();
String kerberosRule =
"RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\nDEFAULT";
KerberosName.setRules(kerberosRule);
}
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
// this can throw an exception, but it will get caught by the superclass.
kdc.stop();
}
protected void patchConfigAtInit(final Configuration conf) {
// turn off some noise during debugging
int timeout = (int) Duration.ofHours(1).toMillis();
conf.setInt("jvm.pause.info-threshold.ms", timeout);
conf.setInt("jvm.pause.warn-threshold.ms", timeout);
}
public void resetUGI() {
UserGroupInformation.reset();
}
/**
* Given a shortname, built a long name with the krb instance and realm info.
* @param shortname short name of the user
* @return a long name
*/
private String userOnHost(final String shortname) {
return shortname + "/" + krbInstance + "@" + getRealm();
}
public String getRealm() {
return kdc.getRealm();
}
/**
* Log in a user to UGI.currentUser.
* @param user user to log in from
* @throws IOException failure
*/
public void loginUser(final String user) throws IOException {
UserGroupInformation.loginUserFromKeytab(user, getKeytabPath());
}
/**
* Log in the login principal as the current user.
* @throws IOException failure
*/
public void loginPrincipal() throws IOException {
loginUser(getLoginPrincipal());
}
/**
* General assertion that security is turred on for a cluster.
*/
public static void assertSecurityEnabled() {
assertTrue("Security is needed for this test",
UserGroupInformation.isSecurityEnabled());
}
/**
* Close filesystems for a user, downgrading a null user to a no-op.
* @param ugi user
* @throws IOException if a close operation raised one.
*/
public static void closeUserFileSystems(UserGroupInformation ugi)
throws IOException {
if (ugi != null) {
FileSystem.closeAllForUGI(ugi);
}
}
/**
* Modify a configuration to use Kerberos as the auth method.
* @param conf configuration to patch.
*/
public void bindConfToCluster(Configuration conf) {
conf.set(HADOOP_SECURITY_AUTHENTICATION,
UserGroupInformation.AuthenticationMethod.KERBEROS.name());
conf.set(CommonConfigurationKeys.HADOOP_USER_GROUP_STATIC_OVERRIDES,
"alice,alice");
// a shortname for the RM principal avoids kerberos mapping problems.
conf.set(YarnConfiguration.RM_PRINCIPAL, BOB);
}
/**
* Utility method to create a URI, converting URISyntaxException
* to RuntimeExceptions. This makes it easier to set up URIs
* in static fields.
* @param uri URI to create.
* @return the URI.
* @throws RuntimeException syntax error.
*/
public static URI newURI(String uri) {
try {
return new URI(uri);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
/**
* Create the filename for a temporary token file, in the
* work dir of this cluster.
* @return a filename which does not exist.
* @throws IOException failure
*/
public File createTempTokenFile() throws IOException {
File tokenfile = File.createTempFile("tokens", ".bin",
getWorkDir());
tokenfile.delete();
return tokenfile;
}
}

View File

@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.extensions;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.time.Clock;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.Objects;
import java.util.UUID;
import com.google.common.base.Preconditions;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
/**
* Token identifier for testing ABFS DT support; matched with
* a service declaration so it can be unmarshalled.
*/
public class StubAbfsTokenIdentifier extends DelegationTokenIdentifier {
public static final String ID = "StubAbfsTokenIdentifier";
public static final int MAX_TEXT_LENGTH = 512;
public static final Text TOKEN_KIND = new Text(ID);
/** Canonical URI of the store. */
private URI uri;
/**
* Timestamp of creation.
* This is set to the current time; it will be overridden when
* deserializing data.
*/
private long created = System.currentTimeMillis();
/**
* This marshalled UUID can be used in testing to verify transmission,
* and reuse; as it is printed you can see what is happending too.
*/
private String uuid = UUID.randomUUID().toString();
/**
* This is the constructor used for deserialization, so there's
* no need to fill in all values.
*/
public StubAbfsTokenIdentifier() {
super(TOKEN_KIND);
}
/**
* Create.
* @param uri owner UI
* @param owner token owner
* @param renewer token renewer
*/
public StubAbfsTokenIdentifier(
final URI uri,
final Text owner,
final Text renewer) {
super(TOKEN_KIND, owner, renewer, new Text());
this.uri = uri;
Clock clock = Clock.systemDefaultZone();
long now = clock.millis();
Instant nowTime = Instant.ofEpochMilli(now);
setIssueDate(now);
setMaxDate(nowTime.plus(1, ChronoUnit.HOURS).toEpochMilli());
}
public static StubAbfsTokenIdentifier decodeIdentifier(final Token<?> token)
throws IOException {
StubAbfsTokenIdentifier id
= (StubAbfsTokenIdentifier) token.decodeIdentifier();
Preconditions.checkNotNull(id, "Null decoded identifier");
return id;
}
public URI getUri() {
return uri;
}
public long getCreated() {
return created;
}
public String getUuid() {
return uuid;
}
/**
* Write state.
* {@link org.apache.hadoop.io.Writable#write(DataOutput)}.
* @param out destination
* @throws IOException failure
*/
@Override
public void write(final DataOutput out) throws IOException {
super.write(out);
Text.writeString(out, uri.toString());
Text.writeString(out, uuid);
out.writeLong(created);
}
/**
* Read state.
* {@link org.apache.hadoop.io.Writable#readFields(DataInput)}.
*
* Note: this operation gets called in toString() operations on tokens, so
* must either always succeed, or throw an IOException to trigger the
* catch & downgrade. RuntimeExceptions (e.g. Preconditions checks) are
* not to be used here for this reason.)
*
* @param in input stream
* @throws IOException IO problems.
*/
@Override
public void readFields(final DataInput in)
throws IOException {
super.readFields(in);
uri = URI.create(Text.readString(in, MAX_TEXT_LENGTH));
uuid = Text.readString(in, MAX_TEXT_LENGTH);
created = in.readLong();
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"AbfsIDBTokenIdentifier{");
sb.append("uri=").append(uri);
sb.append(", uuid='").append(uuid).append('\'');
sb.append(", created='").append(new Date(created)).append('\'');
sb.append('}');
return sb.toString();
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
final StubAbfsTokenIdentifier that = (StubAbfsTokenIdentifier) o;
return created == that.created
&& uri.equals(that.uri)
&& uuid.equals(that.uuid);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), uri, uuid);
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.extensions;
import java.io.IOException;
import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_DELEGATION_TOKEN_PROVIDER_TYPE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_DELEGATION_TOKEN;
/**
* This is a Stub DT manager which adds support for {@link BoundDTExtension}
* to {@link ClassicDelegationTokenManager}.
*/
public class StubDelegationTokenManager extends ClassicDelegationTokenManager
implements BoundDTExtension {
private static final Logger LOG = LoggerFactory.getLogger(
StubDelegationTokenManager.class);
/**
* Classname.
*/
public static final String NAME
= "org.apache.hadoop.fs.azurebfs.extensions.StubDelegationTokenManager";
/**
* Instantiate.
*/
public StubDelegationTokenManager() {
}
@Override
public void bind(final URI uri, final Configuration conf)
throws IOException {
super.innerBind(uri, conf);
}
/**
* Create a token.
*
* @param sequenceNumber sequence number.
* @param uri FS URI
* @param owner FS owner
* @param renewer renewer
* @return a token.
*/
public static Token<DelegationTokenIdentifier> createToken(
final int sequenceNumber,
final URI uri,
final Text owner,
final Text renewer) {
return ClassicDelegationTokenManager.createToken(sequenceNumber, uri, owner,
renewer);
}
/**
* Patch a configuration to declare this the DT provider for a filesystem
* built off the given configuration.
* The ABFS Filesystem still needs to come up with security enabled.
* @param conf configuration.
* @return the patched configuration.
*/
public static Configuration useStubDTManager(Configuration conf) {
conf.setBoolean(FS_AZURE_ENABLE_DELEGATION_TOKEN, true);
conf.set(FS_AZURE_DELEGATION_TOKEN_PROVIDER_TYPE,
StubDelegationTokenManager.NAME);
return conf;
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.extensions;
import java.net.URI;
import java.util.Date;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsTestWithTimeout;
import org.apache.hadoop.fs.azurebfs.oauth2.AzureADToken;
import org.apache.hadoop.fs.azurebfs.oauth2.CustomTokenProviderAdapter;
import static org.apache.hadoop.fs.azurebfs.extensions.WrappingTokenProvider.*;
/**
* Test custom OAuth token providers.
* This is a unit test not an E2E integration test because that would
* require OAuth auth setup, always.
* Instead this just checks that the creation works and that everything
* is propagated.
*/
@SuppressWarnings("UseOfObsoleteDateTimeApi")
public class TestCustomOauthTokenProvider extends AbstractAbfsTestWithTimeout {
public TestCustomOauthTokenProvider() throws Exception {
}
/**
* If you switch to a custom provider, it is loaded and initialized.
*/
@Test
public void testCustomProviderBinding() throws Throwable {
Configuration conf = new Configuration();
WrappingTokenProvider.enable(conf);
AbfsConfiguration abfs = new AbfsConfiguration(conf,
"not-a-real-account");
CustomTokenProviderAdapter provider =
(CustomTokenProviderAdapter) abfs.getTokenProvider();
assertEquals("User agent", INITED, provider.getUserAgentSuffix());
// now mimic the bind call
ExtensionHelper.bind(provider,
new URI("abfs://store@user.dfs.core.windows.net"),
conf);
assertEquals("User agent", BOUND,
ExtensionHelper.getUserAgentSuffix(provider, ""));
AzureADToken token = provider.getToken();
assertEquals("Access token propagation",
ACCESS_TOKEN, token.getAccessToken());
Date expiry = token.getExpiry();
long time = expiry.getTime();
assertTrue("date wrong: " + expiry,
time <= System.currentTimeMillis());
// once closed, the UA state changes.
provider.close();
assertEquals("User agent", CLOSED, provider.getUserAgentSuffix());
}
}

View File

@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.extensions;
import java.net.URI;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsTestWithTimeout;
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import static org.apache.hadoop.fs.azurebfs.extensions.KerberizedAbfsCluster.newURI;
import static org.apache.hadoop.fs.azurebfs.extensions.StubDelegationTokenManager.createToken;
import static org.apache.hadoop.fs.azurebfs.extensions.StubAbfsTokenIdentifier.decodeIdentifier;
/**
* Test the lifecycle of custom DT managers.
*/
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
public class TestDTManagerLifecycle extends AbstractAbfsTestWithTimeout {
public static final String RENEWER = "resourcemanager";
private Configuration conf;
public static final String ABFS
= "abfs://testing@account.dfs.core.windows.net";
public static final URI FSURI = newURI(ABFS);
public static final Text OWNER = new Text("owner");
public static final Text KIND2 = new Text("kind2");
@Before
public void setup() throws Exception {
conf = StubDelegationTokenManager.useStubDTManager(new Configuration());
}
@After
public void teardown() throws Exception {
}
/**
* Assert that a token is of a specific kind
* @param kind expected kind
* @param dt token.
*/
protected void assertTokenKind(final Text kind,
final Token<DelegationTokenIdentifier> dt) {
assertEquals("Token Kind",
kind, dt.getKind());
}
/**
* Test the classic lifecycle, that is: don't call bind() on the manager,
* so that it does not attempt to bind the custom DT manager it has created.
*
* There'll be no canonical service name from the token manager, which
* will trigger falling back to the default value.
*/
@Test
public void testClassicLifecycle() throws Throwable {
AbfsDelegationTokenManager manager
= new AbfsDelegationTokenManager(conf);
StubDelegationTokenManager stub = getTokenManager(manager);
// this is automatically inited
assertTrue("Not initialized: " + stub, stub.isInitialized());
Token<DelegationTokenIdentifier> dt = stub.getDelegationToken(RENEWER);
assertTokenKind(StubAbfsTokenIdentifier.TOKEN_KIND, dt);
assertNull("canonicalServiceName in " + stub,
manager.getCanonicalServiceName());
assertEquals("Issued count number in " + stub, 1, stub.getIssued());
StubAbfsTokenIdentifier id = decodeIdentifier(dt);
assertEquals("Sequence number in " + id, 1, id.getSequenceNumber());
stub.renewDelegationToken(dt);
assertEquals("Renewal count in " + stub, 1, stub.getRenewals());
stub.cancelDelegationToken(dt);
assertEquals("Cancel count in " + stub, 1, stub.getCancellations());
}
protected StubDelegationTokenManager getTokenManager(final AbfsDelegationTokenManager manager) {
return (StubDelegationTokenManager) manager.getTokenManager();
}
/**
* Instantiate through the manager, but then call direct.
*/
@Test
public void testBindingLifecycle() throws Throwable {
AbfsDelegationTokenManager manager = new AbfsDelegationTokenManager(conf);
StubDelegationTokenManager stub = getTokenManager(manager);
assertTrue("Not initialized: " + stub, stub.isInitialized());
stub.bind(FSURI, conf);
assertEquals("URI in " + stub, FSURI, stub.getFsURI());
decodeIdentifier(stub.getDelegationToken(RENEWER));
stub.close();
assertTrue("Not closed: " + stub, stub.isClosed());
// and for resilience
stub.close();
assertTrue("Not closed: " + stub, stub.isClosed());
}
@Test
public void testBindingThroughManager() throws Throwable {
AbfsDelegationTokenManager manager = new AbfsDelegationTokenManager(conf);
manager.bind(FSURI, conf);
StubDelegationTokenManager stub = getTokenManager(manager);
assertEquals("Service in " + manager,
ABFS, stub.createServiceText().toString());
assertEquals("Binding URI of " + stub, FSURI, stub.getFsURI());
Token<DelegationTokenIdentifier> token = manager.getDelegationToken(
RENEWER);
assertEquals("Service in " + token,
ABFS, token.getService().toString());
decodeIdentifier(token);
assertTokenKind(StubAbfsTokenIdentifier.TOKEN_KIND, token);
// now change the token kind on the stub, verify propagation
stub.setKind(KIND2);
Token<DelegationTokenIdentifier> dt2 = manager.getDelegationToken("");
assertTokenKind(KIND2, dt2);
// change the token kind and, unless it is registered, it will not decode.
assertNull("Token is of unknown kind, must not decode",
dt2.decodeIdentifier());
// closing the manager will close the stub too.
manager.close();
assertTrue("Not closed: " + stub, stub.isClosed());
}
/**
* Instantiate a DT manager in the renewal workflow: the manager is
* unbound; tokens must still be issued and cancelled.
*/
@Test
public void testRenewalThroughManager() throws Throwable {
// create without going through the DT manager, which is of course unbound.
Token<DelegationTokenIdentifier> dt = createToken(0, FSURI, OWNER,
new Text(RENEWER));
// create a DT manager in the renewer codepath.
AbfsDelegationTokenManager manager = new AbfsDelegationTokenManager(conf);
StubDelegationTokenManager stub = getTokenManager(manager);
assertNull("Stub should not bebound " + stub, stub.getFsURI());
StubAbfsTokenIdentifier dtId =
(StubAbfsTokenIdentifier) dt.decodeIdentifier();
String idStr = dtId.toString();
assertEquals("URI in " + idStr, FSURI, dtId.getUri());
assertEquals("renewer in " + idStr,
RENEWER, dtId.getRenewer().toString());
manager.renewDelegationToken(dt);
assertEquals("Renewal count in " + stub, 1, stub.getRenewals());
manager.cancelDelegationToken(dt);
assertEquals("Cancel count in " + stub, 1, stub.getCancellations());
// closing the manager will close the stub too.
manager.close();
assertTrue("Not closed: " + stub, stub.isClosed());
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.extensions;
import java.io.IOException;
import java.net.URI;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME;
/**
* Implements a wrapper around ClientCredsTokenProvider.
*/
@SuppressWarnings("UseOfObsoleteDateTimeApi")
public class WrappingTokenProvider implements CustomTokenProviderAdaptee,
BoundDTExtension {
public static final String NAME
= "org.apache.hadoop.fs.azurebfs.extensions.WrappingTokenProvider";
public static final String UA_STRING = "provider=";
public static final String CREATED = UA_STRING + "created";
public static final String INITED = UA_STRING + "inited";
public static final String BOUND = UA_STRING + "bound";
public static final String CLOSED = UA_STRING + "closed";
public static final String ACCESS_TOKEN = "accessToken";
/** URI; only set once bound. */
private URI uri;
private String accountName;
private String state = CREATED;
@Override
public void initialize(
final Configuration configuration,
final String account)
throws IOException {
state = INITED;
accountName = account;
}
@Override
public String getAccessToken() throws IOException {
return ACCESS_TOKEN;
}
@Override
public Date getExpiryTime() {
return new Date(System.currentTimeMillis());
}
@Override
public void bind(final URI fsURI, final Configuration conf)
throws IOException {
state = BOUND;
uri = fsURI;
}
public URI getUri() {
return uri;
}
@Override
public void close() throws IOException {
state = CLOSED;
}
@Override
public String getUserAgentSuffix() {
return state;
}
/**
* Enable the custom token provider.
* This doesn't set any account-specific options.
* @param conf configuration to patch.
*/
public static void enable(Configuration conf) {
conf.setEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME,
AuthType.Custom);
conf.set(FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME, NAME);
}
}

View File

@ -0,0 +1,34 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hadoop.fs.azurebfs.extensions.StubAbfsTokenIdentifier

View File

@ -58,3 +58,4 @@ log4j.logger.org.apache.hadoop.yarn.util.AbstractLivelinessMonitor=WARN
log4j.logger.org.apache.hadoop.security.token.delegation=WARN
log4j.logger.org.apache.hadoop.mapred.ShuffleHandler=WARN
log4j.logger.org.apache.hadoop.ipc.Server=WARN
log4j.logger.org.apache.hadoop.security.ShellBasedUnixGroupsMapping=ERROR