HADOOP-17194. Adding Context class for AbfsClient in ABFS (#2216)
Contributed by Mehakmeet Singh. Change-Id: I120c9a068d758d8e5d071c878a3b7fbeb95e4de6
This commit is contained in:
parent
fcb80c1ade
commit
f6e1ed4f6b
|
@ -84,6 +84,8 @@ import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
|
||||||
import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformerInterface;
|
import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformerInterface;
|
||||||
import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
|
import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
|
||||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.AbfsClientContext;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.AbfsClientContextBuilder;
|
||||||
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
|
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
|
||||||
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
||||||
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
|
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
|
||||||
|
@ -146,6 +148,7 @@ public class AzureBlobFileSystemStore implements Closeable {
|
||||||
private final UserGroupInformation userGroupInformation;
|
private final UserGroupInformation userGroupInformation;
|
||||||
private final IdentityTransformerInterface identityTransformer;
|
private final IdentityTransformerInterface identityTransformer;
|
||||||
private final AbfsPerfTracker abfsPerfTracker;
|
private final AbfsPerfTracker abfsPerfTracker;
|
||||||
|
private final AbfsCounters abfsCounters;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The set of directories where we should store files as append blobs.
|
* The set of directories where we should store files as append blobs.
|
||||||
|
@ -192,7 +195,8 @@ public class AzureBlobFileSystemStore implements Closeable {
|
||||||
boolean usingOauth = (authType == AuthType.OAuth);
|
boolean usingOauth = (authType == AuthType.OAuth);
|
||||||
boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
|
boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
|
||||||
this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration);
|
this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration);
|
||||||
initializeClient(uri, fileSystemName, accountName, useHttps, abfsCounters);
|
this.abfsCounters = abfsCounters;
|
||||||
|
initializeClient(uri, fileSystemName, accountName, useHttps);
|
||||||
final Class<? extends IdentityTransformerInterface> identityTransformerClass =
|
final Class<? extends IdentityTransformerInterface> identityTransformerClass =
|
||||||
configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
|
configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
|
||||||
IdentityTransformerInterface.class);
|
IdentityTransformerInterface.class);
|
||||||
|
@ -1213,8 +1217,19 @@ public class AzureBlobFileSystemStore implements Closeable {
|
||||||
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
|
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A on-off operation to initialize AbfsClient for AzureBlobFileSystem
|
||||||
|
* Operations.
|
||||||
|
*
|
||||||
|
* @param uri Uniform resource identifier for Abfs.
|
||||||
|
* @param fileSystemName Name of the fileSystem being used.
|
||||||
|
* @param accountName Name of the account being used to access Azure
|
||||||
|
* data store.
|
||||||
|
* @param isSecure Tells if https is being used or http.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
private void initializeClient(URI uri, String fileSystemName,
|
private void initializeClient(URI uri, String fileSystemName,
|
||||||
String accountName, boolean isSecure, AbfsCounters abfsCounters)
|
String accountName, boolean isSecure)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (this.client != null) {
|
if (this.client != null) {
|
||||||
return;
|
return;
|
||||||
|
@ -1261,16 +1276,30 @@ public class AzureBlobFileSystemStore implements Closeable {
|
||||||
LOG.trace("Initializing AbfsClient for {}", baseUrl);
|
LOG.trace("Initializing AbfsClient for {}", baseUrl);
|
||||||
if (tokenProvider != null) {
|
if (tokenProvider != null) {
|
||||||
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
|
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
|
||||||
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
|
tokenProvider,
|
||||||
tokenProvider, abfsPerfTracker, abfsCounters);
|
populateAbfsClientContext());
|
||||||
} else {
|
} else {
|
||||||
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
|
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
|
||||||
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
|
sasTokenProvider,
|
||||||
sasTokenProvider, abfsPerfTracker, abfsCounters);
|
populateAbfsClientContext());
|
||||||
}
|
}
|
||||||
LOG.trace("AbfsClient init complete");
|
LOG.trace("AbfsClient init complete");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Populate a new AbfsClientContext instance with the desired properties.
|
||||||
|
*
|
||||||
|
* @return an instance of AbfsClientContext.
|
||||||
|
*/
|
||||||
|
private AbfsClientContext populateAbfsClientContext() {
|
||||||
|
return new AbfsClientContextBuilder()
|
||||||
|
.withExponentialRetryPolicy(
|
||||||
|
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()))
|
||||||
|
.withAbfsCounters(abfsCounters)
|
||||||
|
.withAbfsPerfTracker(abfsPerfTracker)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
private String getOctalNotation(FsPermission fsPermission) {
|
private String getOctalNotation(FsPermission fsPermission) {
|
||||||
Preconditions.checkNotNull(fsPermission, "fsPermission");
|
Preconditions.checkNotNull(fsPermission, "fsPermission");
|
||||||
return String.format(AbfsHttpConstants.PERMISSION_FORMAT, fsPermission.toOctal());
|
return String.format(AbfsHttpConstants.PERMISSION_FORMAT, fsPermission.toOctal());
|
||||||
|
|
|
@ -77,15 +77,13 @@ public class AbfsClient implements Closeable {
|
||||||
|
|
||||||
private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
|
private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
|
||||||
final AbfsConfiguration abfsConfiguration,
|
final AbfsConfiguration abfsConfiguration,
|
||||||
final ExponentialRetryPolicy exponentialRetryPolicy,
|
final AbfsClientContext abfsClientContext) {
|
||||||
final AbfsPerfTracker abfsPerfTracker,
|
|
||||||
final AbfsCounters abfsCounters) {
|
|
||||||
this.baseUrl = baseUrl;
|
this.baseUrl = baseUrl;
|
||||||
this.sharedKeyCredentials = sharedKeyCredentials;
|
this.sharedKeyCredentials = sharedKeyCredentials;
|
||||||
String baseUrlString = baseUrl.toString();
|
String baseUrlString = baseUrl.toString();
|
||||||
this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1);
|
this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1);
|
||||||
this.abfsConfiguration = abfsConfiguration;
|
this.abfsConfiguration = abfsConfiguration;
|
||||||
this.retryPolicy = exponentialRetryPolicy;
|
this.retryPolicy = abfsClientContext.getExponentialRetryPolicy();
|
||||||
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
|
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
|
||||||
this.authType = abfsConfiguration.getAuthType(accountName);
|
this.authType = abfsConfiguration.getAuthType(accountName);
|
||||||
|
|
||||||
|
@ -105,29 +103,23 @@ public class AbfsClient implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
|
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
|
||||||
this.abfsPerfTracker = abfsPerfTracker;
|
this.abfsPerfTracker = abfsClientContext.getAbfsPerfTracker();
|
||||||
this.abfsCounters = abfsCounters;
|
this.abfsCounters = abfsClientContext.getAbfsCounters();
|
||||||
}
|
}
|
||||||
|
|
||||||
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
|
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
|
||||||
final AbfsConfiguration abfsConfiguration,
|
final AbfsConfiguration abfsConfiguration,
|
||||||
final ExponentialRetryPolicy exponentialRetryPolicy,
|
|
||||||
final AccessTokenProvider tokenProvider,
|
final AccessTokenProvider tokenProvider,
|
||||||
final AbfsPerfTracker abfsPerfTracker,
|
final AbfsClientContext abfsClientContext) {
|
||||||
final AbfsCounters abfsCounters) {
|
this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext);
|
||||||
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
|
|
||||||
exponentialRetryPolicy, abfsPerfTracker, abfsCounters);
|
|
||||||
this.tokenProvider = tokenProvider;
|
this.tokenProvider = tokenProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
|
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
|
||||||
final AbfsConfiguration abfsConfiguration,
|
final AbfsConfiguration abfsConfiguration,
|
||||||
final ExponentialRetryPolicy exponentialRetryPolicy,
|
|
||||||
final SASTokenProvider sasTokenProvider,
|
final SASTokenProvider sasTokenProvider,
|
||||||
final AbfsPerfTracker abfsPerfTracker,
|
final AbfsClientContext abfsClientContext) {
|
||||||
final AbfsCounters abfsCounters) {
|
this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext);
|
||||||
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
|
|
||||||
exponentialRetryPolicy, abfsPerfTracker, abfsCounters);
|
|
||||||
this.sasTokenProvider = sasTokenProvider;
|
this.sasTokenProvider = sasTokenProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,51 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.azurebfs.services;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to hold extra configurations for AbfsClient and further classes
|
||||||
|
* inside AbfsClient.
|
||||||
|
*/
|
||||||
|
public class AbfsClientContext {
|
||||||
|
|
||||||
|
private final ExponentialRetryPolicy exponentialRetryPolicy;
|
||||||
|
private final AbfsPerfTracker abfsPerfTracker;
|
||||||
|
private final AbfsCounters abfsCounters;
|
||||||
|
|
||||||
|
AbfsClientContext(
|
||||||
|
ExponentialRetryPolicy exponentialRetryPolicy,
|
||||||
|
AbfsPerfTracker abfsPerfTracker,
|
||||||
|
AbfsCounters abfsCounters) {
|
||||||
|
this.exponentialRetryPolicy = exponentialRetryPolicy;
|
||||||
|
this.abfsPerfTracker = abfsPerfTracker;
|
||||||
|
this.abfsCounters = abfsCounters;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ExponentialRetryPolicy getExponentialRetryPolicy() {
|
||||||
|
return exponentialRetryPolicy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AbfsPerfTracker getAbfsPerfTracker() {
|
||||||
|
return abfsPerfTracker;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AbfsCounters getAbfsCounters() {
|
||||||
|
return abfsCounters;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,58 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.azurebfs.services;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A builder for AbfsClientContext class with different options to select and
|
||||||
|
* build from.
|
||||||
|
*/
|
||||||
|
public class AbfsClientContextBuilder {
|
||||||
|
|
||||||
|
private ExponentialRetryPolicy exponentialRetryPolicy;
|
||||||
|
private AbfsPerfTracker abfsPerfTracker;
|
||||||
|
private AbfsCounters abfsCounters;
|
||||||
|
|
||||||
|
public AbfsClientContextBuilder withExponentialRetryPolicy(
|
||||||
|
final ExponentialRetryPolicy exponentialRetryPolicy) {
|
||||||
|
this.exponentialRetryPolicy = exponentialRetryPolicy;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AbfsClientContextBuilder withAbfsPerfTracker(
|
||||||
|
final AbfsPerfTracker abfsPerfTracker) {
|
||||||
|
this.abfsPerfTracker = abfsPerfTracker;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AbfsClientContextBuilder withAbfsCounters(final AbfsCounters abfsCounters) {
|
||||||
|
this.abfsCounters = abfsCounters;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build the context and get the instance with the properties selected.
|
||||||
|
*
|
||||||
|
* @return an instance of AbfsClientContext.
|
||||||
|
*/
|
||||||
|
public AbfsClientContext build() {
|
||||||
|
//validate the values
|
||||||
|
return new AbfsClientContext(exponentialRetryPolicy, abfsPerfTracker,
|
||||||
|
abfsCounters);
|
||||||
|
}
|
||||||
|
}
|
|
@ -103,8 +103,9 @@ public final class TestAbfsClient {
|
||||||
|
|
||||||
private String getUserAgentString(AbfsConfiguration config,
|
private String getUserAgentString(AbfsConfiguration config,
|
||||||
boolean includeSSLProvider) throws MalformedURLException {
|
boolean includeSSLProvider) throws MalformedURLException {
|
||||||
|
AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().build();
|
||||||
AbfsClient client = new AbfsClient(new URL("https://azure.com"), null,
|
AbfsClient client = new AbfsClient(new URL("https://azure.com"), null,
|
||||||
config, null, (AccessTokenProvider) null, null, null);
|
config, (AccessTokenProvider) null, abfsClientContext);
|
||||||
String sslProviderName = null;
|
String sslProviderName = null;
|
||||||
if (includeSSLProvider) {
|
if (includeSSLProvider) {
|
||||||
sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory()
|
sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory()
|
||||||
|
@ -257,6 +258,12 @@ public final class TestAbfsClient {
|
||||||
abfsConfig.getAccountName(),
|
abfsConfig.getAccountName(),
|
||||||
abfsConfig);
|
abfsConfig);
|
||||||
|
|
||||||
|
AbfsClientContext abfsClientContext =
|
||||||
|
new AbfsClientContextBuilder().withAbfsPerfTracker(tracker)
|
||||||
|
.withExponentialRetryPolicy(
|
||||||
|
new ExponentialRetryPolicy(abfsConfig.getMaxIoRetries()))
|
||||||
|
.build();
|
||||||
|
|
||||||
// Create test AbfsClient
|
// Create test AbfsClient
|
||||||
AbfsClient testClient = new AbfsClient(
|
AbfsClient testClient = new AbfsClient(
|
||||||
baseAbfsClientInstance.getBaseUrl(),
|
baseAbfsClientInstance.getBaseUrl(),
|
||||||
|
@ -267,11 +274,10 @@ public final class TestAbfsClient {
|
||||||
abfsConfig.getStorageAccountKey())
|
abfsConfig.getStorageAccountKey())
|
||||||
: null),
|
: null),
|
||||||
abfsConfig,
|
abfsConfig,
|
||||||
new ExponentialRetryPolicy(abfsConfig.getMaxIoRetries()),
|
|
||||||
(currentAuthType == AuthType.OAuth
|
(currentAuthType == AuthType.OAuth
|
||||||
? abfsConfig.getTokenProvider()
|
? abfsConfig.getTokenProvider()
|
||||||
: null),
|
: null),
|
||||||
tracker, null);
|
abfsClientContext);
|
||||||
|
|
||||||
return testClient;
|
return testClient;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue