HADOOP-13972. ADLS to support per-store configuration.

Contributed by Sharad Sonker.
This commit is contained in:
Steve Loughran 2018-02-15 16:25:55 +00:00
parent 96c047fbb9
commit 050f5287b7
5 changed files with 239 additions and 38 deletions

View File

@ -33,6 +33,11 @@ public final class AdlConfKeys {
public static final String AZURE_AD_REFRESH_URL_KEY = public static final String AZURE_AD_REFRESH_URL_KEY =
"fs.adl.oauth2.refresh.url"; "fs.adl.oauth2.refresh.url";
public static final String AZURE_AD_ACCOUNT_PREFIX =
"fs.adl.account.";
public static final String AZURE_AD_PREFIX =
"fs.adl.";
// optional when provider type is refresh or client id. // optional when provider type is refresh or client id.
public static final String AZURE_AD_TOKEN_PROVIDER_CLASS_KEY = public static final String AZURE_AD_TOKEN_PROVIDER_CLASS_KEY =
"fs.adl.oauth2.access.token.provider"; "fs.adl.oauth2.access.token.provider";

View File

@ -24,8 +24,10 @@ import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.microsoft.azure.datalake.store.ADLStoreClient; import com.microsoft.azure.datalake.store.ADLStoreClient;
import com.microsoft.azure.datalake.store.ADLStoreOptions; import com.microsoft.azure.datalake.store.ADLStoreOptions;
import com.microsoft.azure.datalake.store.DirectoryEntry; import com.microsoft.azure.datalake.store.DirectoryEntry;
@ -37,6 +39,8 @@ import com.microsoft.azure.datalake.store.oauth2.ClientCredsTokenProvider;
import com.microsoft.azure.datalake.store.oauth2.DeviceCodeTokenProvider; import com.microsoft.azure.datalake.store.oauth2.DeviceCodeTokenProvider;
import com.microsoft.azure.datalake.store.oauth2.MsiTokenProvider; import com.microsoft.azure.datalake.store.oauth2.MsiTokenProvider;
import com.microsoft.azure.datalake.store.oauth2.RefreshTokenBasedTokenProvider; import com.microsoft.azure.datalake.store.oauth2.RefreshTokenBasedTokenProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -74,6 +78,8 @@ import static org.apache.hadoop.fs.adl.AdlConfKeys.*;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class AdlFileSystem extends FileSystem { public class AdlFileSystem extends FileSystem {
private static final Logger LOG =
LoggerFactory.getLogger(AdlFileSystem.class);
public static final String SCHEME = "adl"; public static final String SCHEME = "adl";
static final int DEFAULT_PORT = 443; static final int DEFAULT_PORT = 443;
private URI uri; private URI uri;
@ -115,12 +121,19 @@ public class AdlFileSystem extends FileSystem {
/** /**
* Called after a new FileSystem instance is constructed. * Called after a new FileSystem instance is constructed.
* *
* @param storeUri a uri whose authority section names the host, port, etc. * @param storeUri a uri whose authority section names the host, port,
* for this FileSystem * etc. for this FileSystem
* @param conf the configuration * @param originalConf the configuration to use for the FS. The account-
* specific options are patched over the base ones
* before any use is made of the config.
*/ */
@Override @Override
public void initialize(URI storeUri, Configuration conf) throws IOException { public void initialize(URI storeUri, Configuration originalConf)
throws IOException {
String hostname = storeUri.getHost();
String accountName = getAccountNameFromFQDN(hostname);
Configuration conf = propagateAccountOptions(originalConf, accountName);
super.initialize(storeUri, conf); super.initialize(storeUri, conf);
this.setConf(conf); this.setConf(conf);
this.uri = URI this.uri = URI
@ -144,7 +157,6 @@ public class AdlFileSystem extends FileSystem {
String accountFQDN = null; String accountFQDN = null;
String mountPoint = null; String mountPoint = null;
String hostname = storeUri.getHost();
if (!hostname.contains(".") && !hostname.equalsIgnoreCase( if (!hostname.contains(".") && !hostname.equalsIgnoreCase(
"localhost")) { // this is a symbolic name. Resolve it. "localhost")) { // this is a symbolic name. Resolve it.
String hostNameProperty = "dfs.adls." + hostname + ".hostname"; String hostNameProperty = "dfs.adls." + hostname + ".hostname";
@ -985,4 +997,63 @@ public class AdlFileSystem extends FileSystem {
oidOrUpn = enableUPN ? UserGroupRepresentation.UPN : oidOrUpn = enableUPN ? UserGroupRepresentation.UPN :
UserGroupRepresentation.OID; UserGroupRepresentation.OID;
} }
/**
* Gets ADL account name from ADL FQDN.
* @param accountFQDN ADL account fqdn
* @return ADL account name
*/
public static String getAccountNameFromFQDN(String accountFQDN) {
return accountFQDN.contains(".")
? accountFQDN.substring(0, accountFQDN.indexOf("."))
: accountFQDN;
}
/**
* Propagates account-specific settings into generic ADL configuration keys.
* This is done by propagating the values of the form
* {@code fs.adl.account.${account_name}.key} to
* {@code fs.adl.key}, for all values of "key"
*
* The source of the updated property is set to the key name of the account
* property, to aid in diagnostics of where things came from.
*
* Returns a new configuration. Why the clone?
* You can use the same conf for different filesystems, and the original
* values are not updated.
*
*
* @param source Source Configuration object
* @param accountName account name. Must not be empty
* @return a (potentially) patched clone of the original
*/
public static Configuration propagateAccountOptions(Configuration source,
String accountName) {
Preconditions.checkArgument(StringUtils.isNotEmpty(accountName),
"accountName");
final String accountPrefix = AZURE_AD_ACCOUNT_PREFIX + accountName +'.';
LOG.debug("Propagating entries under {}", accountPrefix);
final Configuration dest = new Configuration(source);
for (Map.Entry<String, String> entry : source) {
final String key = entry.getKey();
// get the (unexpanded) value.
final String value = entry.getValue();
if (!key.startsWith(accountPrefix) || accountPrefix.equals(key)) {
continue;
}
// there's a account prefix, so strip it
final String stripped = key.substring(accountPrefix.length());
// propagate the value, building a new origin field.
// to track overwrites, the generic key is overwritten even if
// already matches the new one.
String origin = "[" + StringUtils.join(
source.getPropertySources(key), ", ") +"]";
final String generic = AZURE_AD_PREFIX + stripped;
LOG.debug("Updating {} from {}", generic, origin);
dest.set(generic, value, key + " via " + origin);
}
return dest;
}
} }

View File

@ -36,6 +36,7 @@ This support comes via the JAR file `azure-datalake-store.jar`.
* Tested for scale. * Tested for scale.
* API `setOwner()`, `setAcl`, `removeAclEntries()`, `modifyAclEntries()` accepts UPN or OID * API `setOwner()`, `setAcl`, `removeAclEntries()`, `modifyAclEntries()` accepts UPN or OID
(Object ID) as user and group names. (Object ID) as user and group names.
* Supports per-account configuration.
## Limitations ## Limitations
@ -328,6 +329,42 @@ Add the following properties to `core-site.xml`
</description> </description>
</property> </property>
``` ```
## Configurations for different ADL accounts
Different ADL accounts can be accessed with different ADL client configurations.
This also allows for different login details.
1. All `fs.adl` options can be set on a per account basis.
1. The account specific option is set by replacing the `fs.adl.` prefix on an option
with `fs.adl.account.ACCOUNTNAME.`, where `ACCOUNTNAME` is the name of the account.
1. When connecting to an account, all options explicitly set will override
the base `fs.adl.` values.
As an example, a configuration could have a base configuration to use the public account
`adl://<some-public-account>.azuredatalakestore.net/` and an account-specific configuration
to use some private account `adl://myprivateaccount.azuredatalakestore.net/`
```xml
<property>
<name>fs.adl.oauth2.client.id</name>
<value>CLIENTID</value>
</property>
<property>
<name>fs.adl.oauth2.credential</name>
<value>CREDENTIAL</value>
</property>
<property>
<name>fs.adl.account.myprivateaccount.oauth2.client.id</name>
<value>CLIENTID1</value>
</property>
<property>
<name>fs.adl.account.myprivateaccount.oauth2.credential</name>
<value>CREDENTIAL1</value>
</property>
```
## Testing the azure-datalake-store Module ## Testing the azure-datalake-store Module
The `hadoop-azure` module includes a full suite of unit tests. The `hadoop-azure` module includes a full suite of unit tests.
Most of the tests will run without additional configuration by running `mvn test`. Most of the tests will run without additional configuration by running `mvn test`.

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.fs.adl;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.apache.hadoop.fs.adl.AdlConfKeys.ADL_BLOCK_SIZE; import static org.apache.hadoop.fs.adl.AdlConfKeys.ADL_BLOCK_SIZE;
@ -58,6 +57,8 @@ import static org.apache.hadoop.fs.adl.AdlConfKeys
.TOKEN_PROVIDER_TYPE_REFRESH_TOKEN; .TOKEN_PROVIDER_TYPE_REFRESH_TOKEN;
import static org.apache.hadoop.fs.adl.AdlConfKeys.WRITE_BUFFER_SIZE_KEY; import static org.apache.hadoop.fs.adl.AdlConfKeys.WRITE_BUFFER_SIZE_KEY;
import static org.junit.Assert.assertEquals;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
@ -70,50 +71,48 @@ public class TestValidateConfiguration {
@Test @Test
public void validateConfigurationKeys() { public void validateConfigurationKeys() {
Assert assertEquals("fs.adl.oauth2.refresh.url", AZURE_AD_REFRESH_URL_KEY);
.assertEquals("fs.adl.oauth2.refresh.url", AZURE_AD_REFRESH_URL_KEY); assertEquals("fs.adl.oauth2.access.token.provider",
Assert.assertEquals("fs.adl.oauth2.access.token.provider",
AZURE_AD_TOKEN_PROVIDER_CLASS_KEY); AZURE_AD_TOKEN_PROVIDER_CLASS_KEY);
Assert.assertEquals("fs.adl.oauth2.client.id", AZURE_AD_CLIENT_ID_KEY); assertEquals("fs.adl.oauth2.client.id", AZURE_AD_CLIENT_ID_KEY);
Assert.assertEquals("fs.adl.oauth2.refresh.token", assertEquals("fs.adl.oauth2.refresh.token",
AZURE_AD_REFRESH_TOKEN_KEY); AZURE_AD_REFRESH_TOKEN_KEY);
Assert assertEquals("fs.adl.oauth2.credential", AZURE_AD_CLIENT_SECRET_KEY);
.assertEquals("fs.adl.oauth2.credential", AZURE_AD_CLIENT_SECRET_KEY); assertEquals("adl.debug.override.localuserasfileowner",
Assert.assertEquals("adl.debug.override.localuserasfileowner",
ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER); ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER);
Assert.assertEquals("fs.adl.oauth2.access.token.provider.type", assertEquals("fs.adl.oauth2.access.token.provider.type",
AZURE_AD_TOKEN_PROVIDER_TYPE_KEY); AZURE_AD_TOKEN_PROVIDER_TYPE_KEY);
Assert.assertEquals("adl.feature.client.cache.readahead", assertEquals("adl.feature.client.cache.readahead",
READ_AHEAD_BUFFER_SIZE_KEY); READ_AHEAD_BUFFER_SIZE_KEY);
Assert.assertEquals("adl.feature.client.cache.drop.behind.writes", assertEquals("adl.feature.client.cache.drop.behind.writes",
WRITE_BUFFER_SIZE_KEY); WRITE_BUFFER_SIZE_KEY);
Assert.assertEquals("RefreshToken", TOKEN_PROVIDER_TYPE_REFRESH_TOKEN); assertEquals("RefreshToken", TOKEN_PROVIDER_TYPE_REFRESH_TOKEN);
Assert.assertEquals("ClientCredential", TOKEN_PROVIDER_TYPE_CLIENT_CRED); assertEquals("ClientCredential", TOKEN_PROVIDER_TYPE_CLIENT_CRED);
Assert.assertEquals("adl.enable.client.latency.tracker", assertEquals("adl.enable.client.latency.tracker",
LATENCY_TRACKER_KEY); LATENCY_TRACKER_KEY);
Assert.assertEquals(true, LATENCY_TRACKER_DEFAULT); assertEquals(true, LATENCY_TRACKER_DEFAULT);
Assert.assertEquals(true, ADL_EXPERIMENT_POSITIONAL_READ_DEFAULT); assertEquals(true, ADL_EXPERIMENT_POSITIONAL_READ_DEFAULT);
Assert.assertEquals("adl.feature.experiment.positional.read.enable", assertEquals("adl.feature.experiment.positional.read.enable",
ADL_EXPERIMENT_POSITIONAL_READ_KEY); ADL_EXPERIMENT_POSITIONAL_READ_KEY);
Assert.assertEquals(1, ADL_REPLICATION_FACTOR); assertEquals(1, ADL_REPLICATION_FACTOR);
Assert.assertEquals(256 * 1024 * 1024, ADL_BLOCK_SIZE); assertEquals(256 * 1024 * 1024, ADL_BLOCK_SIZE);
Assert.assertEquals(false, ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT); assertEquals(false, ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT);
Assert.assertEquals(4 * 1024 * 1024, DEFAULT_READ_AHEAD_BUFFER_SIZE); assertEquals(4 * 1024 * 1024, DEFAULT_READ_AHEAD_BUFFER_SIZE);
Assert.assertEquals(4 * 1024 * 1024, DEFAULT_WRITE_AHEAD_BUFFER_SIZE); assertEquals(4 * 1024 * 1024, DEFAULT_WRITE_AHEAD_BUFFER_SIZE);
Assert.assertEquals("adl.feature.ownerandgroup.enableupn", assertEquals("adl.feature.ownerandgroup.enableupn",
ADL_ENABLEUPN_FOR_OWNERGROUP_KEY); ADL_ENABLEUPN_FOR_OWNERGROUP_KEY);
Assert.assertEquals(false, assertEquals(false,
ADL_ENABLEUPN_FOR_OWNERGROUP_DEFAULT); ADL_ENABLEUPN_FOR_OWNERGROUP_DEFAULT);
} }
@ -152,6 +151,95 @@ public class TestValidateConfiguration {
assertDeprecatedKeys(conf); assertDeprecatedKeys(conf);
} }
@Test
public void testGetAccountNameFromFQDN() {
assertEquals("dummy", AdlFileSystem.
getAccountNameFromFQDN("dummy.azuredatalakestore.net"));
assertEquals("localhost", AdlFileSystem.
getAccountNameFromFQDN("localhost"));
}
@Test
public void testPropagateAccountOptionsDefault() {
Configuration conf = new Configuration(false);
conf.set("fs.adl.oauth2.client.id", "defaultClientId");
conf.set("fs.adl.oauth2.credential", "defaultCredential");
conf.set("some.other.config", "someValue");
Configuration propagatedConf =
AdlFileSystem.propagateAccountOptions(conf, "dummy");
assertEquals("defaultClientId",
propagatedConf.get(AZURE_AD_CLIENT_ID_KEY));
assertEquals("defaultCredential",
propagatedConf.get(AZURE_AD_CLIENT_SECRET_KEY));
assertEquals("someValue",
propagatedConf.get("some.other.config"));
}
@Test
public void testPropagateAccountOptionsSpecified() {
Configuration conf = new Configuration(false);
conf.set("fs.adl.account.dummy.oauth2.client.id", "dummyClientId");
conf.set("fs.adl.account.dummy.oauth2.credential", "dummyCredential");
conf.set("some.other.config", "someValue");
Configuration propagatedConf =
AdlFileSystem.propagateAccountOptions(conf, "dummy");
assertEquals("dummyClientId",
propagatedConf.get(AZURE_AD_CLIENT_ID_KEY));
assertEquals("dummyCredential",
propagatedConf.get(AZURE_AD_CLIENT_SECRET_KEY));
assertEquals("someValue",
propagatedConf.get("some.other.config"));
propagatedConf =
AdlFileSystem.propagateAccountOptions(conf, "anotherDummy");
assertEquals(null,
propagatedConf.get(AZURE_AD_CLIENT_ID_KEY));
assertEquals(null,
propagatedConf.get(AZURE_AD_CLIENT_SECRET_KEY));
assertEquals("someValue",
propagatedConf.get("some.other.config"));
}
@Test
public void testPropagateAccountOptionsAll() {
Configuration conf = new Configuration(false);
conf.set("fs.adl.oauth2.client.id", "defaultClientId");
conf.set("fs.adl.oauth2.credential", "defaultCredential");
conf.set("some.other.config", "someValue");
conf.set("fs.adl.account.dummy1.oauth2.client.id", "dummyClientId1");
conf.set("fs.adl.account.dummy1.oauth2.credential", "dummyCredential1");
conf.set("fs.adl.account.dummy2.oauth2.client.id", "dummyClientId2");
conf.set("fs.adl.account.dummy2.oauth2.credential", "dummyCredential2");
Configuration propagatedConf =
AdlFileSystem.propagateAccountOptions(conf, "dummy1");
assertEquals("dummyClientId1",
propagatedConf.get(AZURE_AD_CLIENT_ID_KEY));
assertEquals("dummyCredential1",
propagatedConf.get(AZURE_AD_CLIENT_SECRET_KEY));
assertEquals("someValue",
propagatedConf.get("some.other.config"));
propagatedConf =
AdlFileSystem.propagateAccountOptions(conf, "dummy2");
assertEquals("dummyClientId2",
propagatedConf.get(AZURE_AD_CLIENT_ID_KEY));
assertEquals("dummyCredential2",
propagatedConf.get(AZURE_AD_CLIENT_SECRET_KEY));
assertEquals("someValue",
propagatedConf.get("some.other.config"));
propagatedConf =
AdlFileSystem.propagateAccountOptions(conf, "anotherDummy");
assertEquals("defaultClientId",
propagatedConf.get(AZURE_AD_CLIENT_ID_KEY));
assertEquals("defaultCredential",
propagatedConf.get(AZURE_AD_CLIENT_SECRET_KEY));
assertEquals("someValue",
propagatedConf.get("some.other.config"));
}
private void setDeprecatedKeys(Configuration conf) { private void setDeprecatedKeys(Configuration conf) {
conf.set("dfs.adls.oauth2.access.token.provider.type", "dummyType"); conf.set("dfs.adls.oauth2.access.token.provider.type", "dummyType");
conf.set("dfs.adls.oauth2.client.id", "dummyClientId"); conf.set("dfs.adls.oauth2.client.id", "dummyClientId");
@ -163,19 +251,19 @@ public class TestValidateConfiguration {
} }
private void assertDeprecatedKeys(Configuration conf) { private void assertDeprecatedKeys(Configuration conf) {
Assert.assertEquals("dummyType", assertEquals("dummyType",
conf.get(AZURE_AD_TOKEN_PROVIDER_TYPE_KEY)); conf.get(AZURE_AD_TOKEN_PROVIDER_TYPE_KEY));
Assert.assertEquals("dummyClientId", assertEquals("dummyClientId",
conf.get(AZURE_AD_CLIENT_ID_KEY)); conf.get(AZURE_AD_CLIENT_ID_KEY));
Assert.assertEquals("dummyRefreshToken", assertEquals("dummyRefreshToken",
conf.get(AZURE_AD_REFRESH_TOKEN_KEY)); conf.get(AZURE_AD_REFRESH_TOKEN_KEY));
Assert.assertEquals("dummyRefreshUrl", assertEquals("dummyRefreshUrl",
conf.get(AZURE_AD_REFRESH_URL_KEY)); conf.get(AZURE_AD_REFRESH_URL_KEY));
Assert.assertEquals("dummyCredential", assertEquals("dummyCredential",
conf.get(AZURE_AD_CLIENT_SECRET_KEY)); conf.get(AZURE_AD_CLIENT_SECRET_KEY));
Assert.assertEquals("dummyClass", assertEquals("dummyClass",
conf.get(AZURE_AD_TOKEN_PROVIDER_CLASS_KEY)); conf.get(AZURE_AD_TOKEN_PROVIDER_CLASS_KEY));
Assert.assertEquals("dummyTracker", assertEquals("dummyTracker",
conf.get(LATENCY_TRACKER_KEY)); conf.get(LATENCY_TRACKER_KEY));
} }
} }

View File

@ -39,7 +39,7 @@ public class Parallelized extends Parameterized {
private static class ThreadPoolScheduler implements RunnerScheduler { private static class ThreadPoolScheduler implements RunnerScheduler {
private ExecutorService executor; private ExecutorService executor;
public ThreadPoolScheduler() { ThreadPoolScheduler() {
int numThreads = 10; int numThreads = 10;
executor = Executors.newFixedThreadPool(numThreads); executor = Executors.newFixedThreadPool(numThreads);
} }