HADOOP-13972. ADLS to support per-store configuration.

Contributed by Sharad Sonker.

(cherry picked from commit 050f5287b7)
(cherry picked from commit 07c7df4b26)
This commit is contained in:
Steve Loughran 2018-02-15 16:25:55 +00:00 committed by Chris Douglas
parent 0d7a3f48bc
commit 717295e464
5 changed files with 239 additions and 38 deletions

View File

@ -33,6 +33,11 @@ public final class AdlConfKeys {
public static final String AZURE_AD_REFRESH_URL_KEY =
"fs.adl.oauth2.refresh.url";
public static final String AZURE_AD_ACCOUNT_PREFIX =
"fs.adl.account.";
public static final String AZURE_AD_PREFIX =
"fs.adl.";
// optional when provider type is refresh or client id.
public static final String AZURE_AD_TOKEN_PROVIDER_CLASS_KEY =
"fs.adl.oauth2.access.token.provider";

View File

@ -24,8 +24,10 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.microsoft.azure.datalake.store.ADLStoreClient;
import com.microsoft.azure.datalake.store.ADLStoreOptions;
import com.microsoft.azure.datalake.store.DirectoryEntry;
@ -38,6 +40,8 @@ import com.microsoft.azure.datalake.store.oauth2.ClientCredsTokenProvider;
import com.microsoft.azure.datalake.store.oauth2.DeviceCodeTokenProvider;
import com.microsoft.azure.datalake.store.oauth2.MsiTokenProvider;
import com.microsoft.azure.datalake.store.oauth2.RefreshTokenBasedTokenProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
@ -75,6 +79,8 @@ import static org.apache.hadoop.fs.adl.AdlConfKeys.*;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class AdlFileSystem extends FileSystem {
private static final Logger LOG =
LoggerFactory.getLogger(AdlFileSystem.class);
public static final String SCHEME = "adl";
static final int DEFAULT_PORT = 443;
private URI uri;
@ -116,12 +122,19 @@ public class AdlFileSystem extends FileSystem {
/**
* Called after a new FileSystem instance is constructed.
*
* @param storeUri a uri whose authority section names the host, port, etc.
* for this FileSystem
* @param conf the configuration
* @param storeUri a uri whose authority section names the host, port,
* etc. for this FileSystem
* @param originalConf the configuration to use for the FS. The account-
* specific options are patched over the base ones
* before any use is made of the config.
*/
@Override
public void initialize(URI storeUri, Configuration conf) throws IOException {
public void initialize(URI storeUri, Configuration originalConf)
throws IOException {
String hostname = storeUri.getHost();
String accountName = getAccountNameFromFQDN(hostname);
Configuration conf = propagateAccountOptions(originalConf, accountName);
super.initialize(storeUri, conf);
this.setConf(conf);
this.uri = URI
@ -145,7 +158,6 @@ public class AdlFileSystem extends FileSystem {
String accountFQDN = null;
String mountPoint = null;
String hostname = storeUri.getHost();
if (!hostname.contains(".") && !hostname.equalsIgnoreCase(
"localhost")) { // this is a symbolic name. Resolve it.
String hostNameProperty = "dfs.adls." + hostname + ".hostname";
@ -1004,4 +1016,63 @@ public class AdlFileSystem extends FileSystem {
oidOrUpn = enableUPN ? UserGroupRepresentation.UPN :
UserGroupRepresentation.OID;
}
/**
* Gets ADL account name from ADL FQDN.
* @param accountFQDN ADL account fqdn
* @return ADL account name
*/
public static String getAccountNameFromFQDN(String accountFQDN) {
return accountFQDN.contains(".")
? accountFQDN.substring(0, accountFQDN.indexOf("."))
: accountFQDN;
}
/**
* Propagates account-specific settings into generic ADL configuration keys.
* This is done by propagating the values of the form
* {@code fs.adl.account.${account_name}.key} to
* {@code fs.adl.key}, for all values of "key"
*
* The source of the updated property is set to the key name of the account
* property, to aid in diagnostics of where things came from.
*
* Returns a new configuration. Why the clone?
* You can use the same conf for different filesystems, and the original
* values are not updated.
*
*
* @param source Source Configuration object
* @param accountName account name. Must not be empty
* @return a (potentially) patched clone of the original
*/
public static Configuration propagateAccountOptions(Configuration source,
String accountName) {
Preconditions.checkArgument(StringUtils.isNotEmpty(accountName),
"accountName");
final String accountPrefix = AZURE_AD_ACCOUNT_PREFIX + accountName +'.';
LOG.debug("Propagating entries under {}", accountPrefix);
final Configuration dest = new Configuration(source);
for (Map.Entry<String, String> entry : source) {
final String key = entry.getKey();
// get the (unexpanded) value.
final String value = entry.getValue();
if (!key.startsWith(accountPrefix) || accountPrefix.equals(key)) {
continue;
}
// there's a account prefix, so strip it
final String stripped = key.substring(accountPrefix.length());
// propagate the value, building a new origin field.
// to track overwrites, the generic key is overwritten even if
// already matches the new one.
String origin = "[" + StringUtils.join(
source.getPropertySources(key), ", ") +"]";
final String generic = AZURE_AD_PREFIX + stripped;
LOG.debug("Updating {} from {}", generic, origin);
dest.set(generic, value, key + " via " + origin);
}
return dest;
}
}

View File

@ -32,6 +32,7 @@ This support comes via the JAR file `azure-datalake-store.jar`.
* Tested for scale.
* API `setOwner()`, `setAcl`, `removeAclEntries()`, `modifyAclEntries()` accepts UPN or OID
(Object ID) as user and group names.
* Supports per-account configuration.
## Limitations
@ -324,6 +325,42 @@ Add the following properties to `core-site.xml`
</description>
</property>
```
## Configurations for different ADL accounts
Different ADL accounts can be accessed with different ADL client configurations.
This also allows for different login details.
1. All `fs.adl` options can be set on a per account basis.
1. The account specific option is set by replacing the `fs.adl.` prefix on an option
with `fs.adl.account.ACCOUNTNAME.`, where `ACCOUNTNAME` is the name of the account.
1. When connecting to an account, all options explicitly set will override
the base `fs.adl.` values.
As an example, a configuration could have a base configuration to use the public account
`adl://<some-public-account>.azuredatalakestore.net/` and an account-specific configuration
to use some private account `adl://myprivateaccount.azuredatalakestore.net/`
```xml
<property>
<name>fs.adl.oauth2.client.id</name>
<value>CLIENTID</value>
</property>
<property>
<name>fs.adl.oauth2.credential</name>
<value>CREDENTIAL</value>
</property>
<property>
<name>fs.adl.account.myprivateaccount.oauth2.client.id</name>
<value>CLIENTID1</value>
</property>
<property>
<name>fs.adl.account.myprivateaccount.oauth2.credential</name>
<value>CREDENTIAL1</value>
</property>
```
## Testing the azure-datalake-store Module
The `hadoop-azure` module includes a full suite of unit tests.
Most of the tests will run without additional configuration by running `mvn test`.

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.fs.adl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.hadoop.fs.adl.AdlConfKeys.ADL_BLOCK_SIZE;
@ -58,6 +57,8 @@ import static org.apache.hadoop.fs.adl.AdlConfKeys
.TOKEN_PROVIDER_TYPE_REFRESH_TOKEN;
import static org.apache.hadoop.fs.adl.AdlConfKeys.WRITE_BUFFER_SIZE_KEY;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@ -70,50 +71,48 @@ public class TestValidateConfiguration {
@Test
public void validateConfigurationKeys() {
Assert
.assertEquals("fs.adl.oauth2.refresh.url", AZURE_AD_REFRESH_URL_KEY);
Assert.assertEquals("fs.adl.oauth2.access.token.provider",
assertEquals("fs.adl.oauth2.refresh.url", AZURE_AD_REFRESH_URL_KEY);
assertEquals("fs.adl.oauth2.access.token.provider",
AZURE_AD_TOKEN_PROVIDER_CLASS_KEY);
Assert.assertEquals("fs.adl.oauth2.client.id", AZURE_AD_CLIENT_ID_KEY);
Assert.assertEquals("fs.adl.oauth2.refresh.token",
assertEquals("fs.adl.oauth2.client.id", AZURE_AD_CLIENT_ID_KEY);
assertEquals("fs.adl.oauth2.refresh.token",
AZURE_AD_REFRESH_TOKEN_KEY);
Assert
.assertEquals("fs.adl.oauth2.credential", AZURE_AD_CLIENT_SECRET_KEY);
Assert.assertEquals("adl.debug.override.localuserasfileowner",
assertEquals("fs.adl.oauth2.credential", AZURE_AD_CLIENT_SECRET_KEY);
assertEquals("adl.debug.override.localuserasfileowner",
ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER);
Assert.assertEquals("fs.adl.oauth2.access.token.provider.type",
assertEquals("fs.adl.oauth2.access.token.provider.type",
AZURE_AD_TOKEN_PROVIDER_TYPE_KEY);
Assert.assertEquals("adl.feature.client.cache.readahead",
assertEquals("adl.feature.client.cache.readahead",
READ_AHEAD_BUFFER_SIZE_KEY);
Assert.assertEquals("adl.feature.client.cache.drop.behind.writes",
assertEquals("adl.feature.client.cache.drop.behind.writes",
WRITE_BUFFER_SIZE_KEY);
Assert.assertEquals("RefreshToken", TOKEN_PROVIDER_TYPE_REFRESH_TOKEN);
assertEquals("RefreshToken", TOKEN_PROVIDER_TYPE_REFRESH_TOKEN);
Assert.assertEquals("ClientCredential", TOKEN_PROVIDER_TYPE_CLIENT_CRED);
assertEquals("ClientCredential", TOKEN_PROVIDER_TYPE_CLIENT_CRED);
Assert.assertEquals("adl.enable.client.latency.tracker",
assertEquals("adl.enable.client.latency.tracker",
LATENCY_TRACKER_KEY);
Assert.assertEquals(true, LATENCY_TRACKER_DEFAULT);
assertEquals(true, LATENCY_TRACKER_DEFAULT);
Assert.assertEquals(true, ADL_EXPERIMENT_POSITIONAL_READ_DEFAULT);
assertEquals(true, ADL_EXPERIMENT_POSITIONAL_READ_DEFAULT);
Assert.assertEquals("adl.feature.experiment.positional.read.enable",
assertEquals("adl.feature.experiment.positional.read.enable",
ADL_EXPERIMENT_POSITIONAL_READ_KEY);
Assert.assertEquals(1, ADL_REPLICATION_FACTOR);
Assert.assertEquals(256 * 1024 * 1024, ADL_BLOCK_SIZE);
Assert.assertEquals(false, ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT);
Assert.assertEquals(4 * 1024 * 1024, DEFAULT_READ_AHEAD_BUFFER_SIZE);
Assert.assertEquals(4 * 1024 * 1024, DEFAULT_WRITE_AHEAD_BUFFER_SIZE);
assertEquals(1, ADL_REPLICATION_FACTOR);
assertEquals(256 * 1024 * 1024, ADL_BLOCK_SIZE);
assertEquals(false, ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT);
assertEquals(4 * 1024 * 1024, DEFAULT_READ_AHEAD_BUFFER_SIZE);
assertEquals(4 * 1024 * 1024, DEFAULT_WRITE_AHEAD_BUFFER_SIZE);
Assert.assertEquals("adl.feature.ownerandgroup.enableupn",
assertEquals("adl.feature.ownerandgroup.enableupn",
ADL_ENABLEUPN_FOR_OWNERGROUP_KEY);
Assert.assertEquals(false,
assertEquals(false,
ADL_ENABLEUPN_FOR_OWNERGROUP_DEFAULT);
}
@ -152,6 +151,95 @@ public class TestValidateConfiguration {
assertDeprecatedKeys(conf);
}
@Test
public void testGetAccountNameFromFQDN() {
assertEquals("dummy", AdlFileSystem.
getAccountNameFromFQDN("dummy.azuredatalakestore.net"));
assertEquals("localhost", AdlFileSystem.
getAccountNameFromFQDN("localhost"));
}
@Test
public void testPropagateAccountOptionsDefault() {
Configuration conf = new Configuration(false);
conf.set("fs.adl.oauth2.client.id", "defaultClientId");
conf.set("fs.adl.oauth2.credential", "defaultCredential");
conf.set("some.other.config", "someValue");
Configuration propagatedConf =
AdlFileSystem.propagateAccountOptions(conf, "dummy");
assertEquals("defaultClientId",
propagatedConf.get(AZURE_AD_CLIENT_ID_KEY));
assertEquals("defaultCredential",
propagatedConf.get(AZURE_AD_CLIENT_SECRET_KEY));
assertEquals("someValue",
propagatedConf.get("some.other.config"));
}
@Test
public void testPropagateAccountOptionsSpecified() {
Configuration conf = new Configuration(false);
conf.set("fs.adl.account.dummy.oauth2.client.id", "dummyClientId");
conf.set("fs.adl.account.dummy.oauth2.credential", "dummyCredential");
conf.set("some.other.config", "someValue");
Configuration propagatedConf =
AdlFileSystem.propagateAccountOptions(conf, "dummy");
assertEquals("dummyClientId",
propagatedConf.get(AZURE_AD_CLIENT_ID_KEY));
assertEquals("dummyCredential",
propagatedConf.get(AZURE_AD_CLIENT_SECRET_KEY));
assertEquals("someValue",
propagatedConf.get("some.other.config"));
propagatedConf =
AdlFileSystem.propagateAccountOptions(conf, "anotherDummy");
assertEquals(null,
propagatedConf.get(AZURE_AD_CLIENT_ID_KEY));
assertEquals(null,
propagatedConf.get(AZURE_AD_CLIENT_SECRET_KEY));
assertEquals("someValue",
propagatedConf.get("some.other.config"));
}
@Test
public void testPropagateAccountOptionsAll() {
Configuration conf = new Configuration(false);
conf.set("fs.adl.oauth2.client.id", "defaultClientId");
conf.set("fs.adl.oauth2.credential", "defaultCredential");
conf.set("some.other.config", "someValue");
conf.set("fs.adl.account.dummy1.oauth2.client.id", "dummyClientId1");
conf.set("fs.adl.account.dummy1.oauth2.credential", "dummyCredential1");
conf.set("fs.adl.account.dummy2.oauth2.client.id", "dummyClientId2");
conf.set("fs.adl.account.dummy2.oauth2.credential", "dummyCredential2");
Configuration propagatedConf =
AdlFileSystem.propagateAccountOptions(conf, "dummy1");
assertEquals("dummyClientId1",
propagatedConf.get(AZURE_AD_CLIENT_ID_KEY));
assertEquals("dummyCredential1",
propagatedConf.get(AZURE_AD_CLIENT_SECRET_KEY));
assertEquals("someValue",
propagatedConf.get("some.other.config"));
propagatedConf =
AdlFileSystem.propagateAccountOptions(conf, "dummy2");
assertEquals("dummyClientId2",
propagatedConf.get(AZURE_AD_CLIENT_ID_KEY));
assertEquals("dummyCredential2",
propagatedConf.get(AZURE_AD_CLIENT_SECRET_KEY));
assertEquals("someValue",
propagatedConf.get("some.other.config"));
propagatedConf =
AdlFileSystem.propagateAccountOptions(conf, "anotherDummy");
assertEquals("defaultClientId",
propagatedConf.get(AZURE_AD_CLIENT_ID_KEY));
assertEquals("defaultCredential",
propagatedConf.get(AZURE_AD_CLIENT_SECRET_KEY));
assertEquals("someValue",
propagatedConf.get("some.other.config"));
}
private void setDeprecatedKeys(Configuration conf) {
conf.set("dfs.adls.oauth2.access.token.provider.type", "dummyType");
conf.set("dfs.adls.oauth2.client.id", "dummyClientId");
@ -163,19 +251,19 @@ public class TestValidateConfiguration {
}
private void assertDeprecatedKeys(Configuration conf) {
Assert.assertEquals("dummyType",
assertEquals("dummyType",
conf.get(AZURE_AD_TOKEN_PROVIDER_TYPE_KEY));
Assert.assertEquals("dummyClientId",
assertEquals("dummyClientId",
conf.get(AZURE_AD_CLIENT_ID_KEY));
Assert.assertEquals("dummyRefreshToken",
assertEquals("dummyRefreshToken",
conf.get(AZURE_AD_REFRESH_TOKEN_KEY));
Assert.assertEquals("dummyRefreshUrl",
assertEquals("dummyRefreshUrl",
conf.get(AZURE_AD_REFRESH_URL_KEY));
Assert.assertEquals("dummyCredential",
assertEquals("dummyCredential",
conf.get(AZURE_AD_CLIENT_SECRET_KEY));
Assert.assertEquals("dummyClass",
assertEquals("dummyClass",
conf.get(AZURE_AD_TOKEN_PROVIDER_CLASS_KEY));
Assert.assertEquals("dummyTracker",
assertEquals("dummyTracker",
conf.get(LATENCY_TRACKER_KEY));
}
}

View File

@ -39,7 +39,7 @@ public class Parallelized extends Parameterized {
private static class ThreadPoolScheduler implements RunnerScheduler {
private ExecutorService executor;
public ThreadPoolScheduler() {
ThreadPoolScheduler() {
int numThreads = 10;
executor = Executors.newFixedThreadPool(numThreads);
}