HADOOP-13972. ADLS to support per-store configuration.
Contributed by Sharad Sonker.
(cherry picked from commit 050f5287b7
)
This commit is contained in:
parent
1888b94806
commit
b302f728f8
|
@ -33,6 +33,11 @@ public final class AdlConfKeys {
|
|||
public static final String AZURE_AD_REFRESH_URL_KEY =
|
||||
"fs.adl.oauth2.refresh.url";
|
||||
|
||||
public static final String AZURE_AD_ACCOUNT_PREFIX =
|
||||
"fs.adl.account.";
|
||||
public static final String AZURE_AD_PREFIX =
|
||||
"fs.adl.";
|
||||
|
||||
// optional when provider type is refresh or client id.
|
||||
public static final String AZURE_AD_TOKEN_PROVIDER_CLASS_KEY =
|
||||
"fs.adl.oauth2.access.token.provider";
|
||||
|
|
|
@ -24,8 +24,10 @@ import java.net.URI;
|
|||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.microsoft.azure.datalake.store.ADLStoreClient;
|
||||
import com.microsoft.azure.datalake.store.ADLStoreOptions;
|
||||
import com.microsoft.azure.datalake.store.DirectoryEntry;
|
||||
|
@ -37,6 +39,8 @@ import com.microsoft.azure.datalake.store.oauth2.ClientCredsTokenProvider;
|
|||
import com.microsoft.azure.datalake.store.oauth2.DeviceCodeTokenProvider;
|
||||
import com.microsoft.azure.datalake.store.oauth2.MsiTokenProvider;
|
||||
import com.microsoft.azure.datalake.store.oauth2.RefreshTokenBasedTokenProvider;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -74,6 +78,8 @@ import static org.apache.hadoop.fs.adl.AdlConfKeys.*;
|
|||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class AdlFileSystem extends FileSystem {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AdlFileSystem.class);
|
||||
public static final String SCHEME = "adl";
|
||||
static final int DEFAULT_PORT = 443;
|
||||
private URI uri;
|
||||
|
@ -115,12 +121,19 @@ public class AdlFileSystem extends FileSystem {
|
|||
/**
|
||||
* Called after a new FileSystem instance is constructed.
|
||||
*
|
||||
* @param storeUri a uri whose authority section names the host, port, etc.
|
||||
* for this FileSystem
|
||||
* @param conf the configuration
|
||||
* @param storeUri a uri whose authority section names the host, port,
|
||||
* etc. for this FileSystem
|
||||
* @param originalConf the configuration to use for the FS. The account-
|
||||
* specific options are patched over the base ones
|
||||
* before any use is made of the config.
|
||||
*/
|
||||
@Override
|
||||
public void initialize(URI storeUri, Configuration conf) throws IOException {
|
||||
public void initialize(URI storeUri, Configuration originalConf)
|
||||
throws IOException {
|
||||
String hostname = storeUri.getHost();
|
||||
String accountName = getAccountNameFromFQDN(hostname);
|
||||
Configuration conf = propagateAccountOptions(originalConf, accountName);
|
||||
|
||||
super.initialize(storeUri, conf);
|
||||
this.setConf(conf);
|
||||
this.uri = URI
|
||||
|
@ -144,7 +157,6 @@ public class AdlFileSystem extends FileSystem {
|
|||
|
||||
String accountFQDN = null;
|
||||
String mountPoint = null;
|
||||
String hostname = storeUri.getHost();
|
||||
if (!hostname.contains(".") && !hostname.equalsIgnoreCase(
|
||||
"localhost")) { // this is a symbolic name. Resolve it.
|
||||
String hostNameProperty = "dfs.adls." + hostname + ".hostname";
|
||||
|
@ -985,4 +997,63 @@ public class AdlFileSystem extends FileSystem {
|
|||
oidOrUpn = enableUPN ? UserGroupRepresentation.UPN :
|
||||
UserGroupRepresentation.OID;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets ADL account name from ADL FQDN.
|
||||
* @param accountFQDN ADL account fqdn
|
||||
* @return ADL account name
|
||||
*/
|
||||
public static String getAccountNameFromFQDN(String accountFQDN) {
|
||||
return accountFQDN.contains(".")
|
||||
? accountFQDN.substring(0, accountFQDN.indexOf("."))
|
||||
: accountFQDN;
|
||||
}
|
||||
|
||||
/**
|
||||
* Propagates account-specific settings into generic ADL configuration keys.
|
||||
* This is done by propagating the values of the form
|
||||
* {@code fs.adl.account.${account_name}.key} to
|
||||
* {@code fs.adl.key}, for all values of "key"
|
||||
*
|
||||
* The source of the updated property is set to the key name of the account
|
||||
* property, to aid in diagnostics of where things came from.
|
||||
*
|
||||
* Returns a new configuration. Why the clone?
|
||||
* You can use the same conf for different filesystems, and the original
|
||||
* values are not updated.
|
||||
*
|
||||
*
|
||||
* @param source Source Configuration object
|
||||
* @param accountName account name. Must not be empty
|
||||
* @return a (potentially) patched clone of the original
|
||||
*/
|
||||
public static Configuration propagateAccountOptions(Configuration source,
|
||||
String accountName) {
|
||||
|
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(accountName),
|
||||
"accountName");
|
||||
final String accountPrefix = AZURE_AD_ACCOUNT_PREFIX + accountName +'.';
|
||||
LOG.debug("Propagating entries under {}", accountPrefix);
|
||||
final Configuration dest = new Configuration(source);
|
||||
for (Map.Entry<String, String> entry : source) {
|
||||
final String key = entry.getKey();
|
||||
// get the (unexpanded) value.
|
||||
final String value = entry.getValue();
|
||||
if (!key.startsWith(accountPrefix) || accountPrefix.equals(key)) {
|
||||
continue;
|
||||
}
|
||||
// there's a account prefix, so strip it
|
||||
final String stripped = key.substring(accountPrefix.length());
|
||||
|
||||
// propagate the value, building a new origin field.
|
||||
// to track overwrites, the generic key is overwritten even if
|
||||
// already matches the new one.
|
||||
String origin = "[" + StringUtils.join(
|
||||
source.getPropertySources(key), ", ") +"]";
|
||||
final String generic = AZURE_AD_PREFIX + stripped;
|
||||
LOG.debug("Updating {} from {}", generic, origin);
|
||||
dest.set(generic, value, key + " via " + origin);
|
||||
}
|
||||
return dest;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ This support comes via the JAR file `azure-datalake-store.jar`.
|
|||
* Tested for scale.
|
||||
* API `setOwner()`, `setAcl`, `removeAclEntries()`, `modifyAclEntries()` accepts UPN or OID
|
||||
(Object ID) as user and group names.
|
||||
* Supports per-account configuration.
|
||||
|
||||
## Limitations
|
||||
|
||||
|
@ -328,6 +329,42 @@ Add the following properties to `core-site.xml`
|
|||
</description>
|
||||
</property>
|
||||
```
|
||||
## Configurations for different ADL accounts
|
||||
Different ADL accounts can be accessed with different ADL client configurations.
|
||||
This also allows for different login details.
|
||||
|
||||
1. All `fs.adl` options can be set on a per account basis.
|
||||
1. The account specific option is set by replacing the `fs.adl.` prefix on an option
|
||||
with `fs.adl.account.ACCOUNTNAME.`, where `ACCOUNTNAME` is the name of the account.
|
||||
1. When connecting to an account, all options explicitly set will override
|
||||
the base `fs.adl.` values.
|
||||
|
||||
As an example, a configuration could have a base configuration to use the public account
|
||||
`adl://<some-public-account>.azuredatalakestore.net/` and an account-specific configuration
|
||||
to use some private account `adl://myprivateaccount.azuredatalakestore.net/`
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.adl.oauth2.client.id</name>
|
||||
<value>CLIENTID</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.adl.oauth2.credential</name>
|
||||
<value>CREDENTIAL</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.adl.account.myprivateaccount.oauth2.client.id</name>
|
||||
<value>CLIENTID1</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.adl.account.myprivateaccount.oauth2.credential</name>
|
||||
<value>CREDENTIAL1</value>
|
||||
</property>
|
||||
```
|
||||
|
||||
## Testing the azure-datalake-store Module
|
||||
The `hadoop-azure` module includes a full suite of unit tests.
|
||||
Most of the tests will run without additional configuration by running `mvn test`.
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.fs.adl;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.hadoop.fs.adl.AdlConfKeys.ADL_BLOCK_SIZE;
|
||||
|
@ -58,6 +57,8 @@ import static org.apache.hadoop.fs.adl.AdlConfKeys
|
|||
.TOKEN_PROVIDER_TYPE_REFRESH_TOKEN;
|
||||
import static org.apache.hadoop.fs.adl.AdlConfKeys.WRITE_BUFFER_SIZE_KEY;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
|
@ -70,50 +71,48 @@ public class TestValidateConfiguration {
|
|||
|
||||
@Test
|
||||
public void validateConfigurationKeys() {
|
||||
Assert
|
||||
.assertEquals("fs.adl.oauth2.refresh.url", AZURE_AD_REFRESH_URL_KEY);
|
||||
Assert.assertEquals("fs.adl.oauth2.access.token.provider",
|
||||
assertEquals("fs.adl.oauth2.refresh.url", AZURE_AD_REFRESH_URL_KEY);
|
||||
assertEquals("fs.adl.oauth2.access.token.provider",
|
||||
AZURE_AD_TOKEN_PROVIDER_CLASS_KEY);
|
||||
Assert.assertEquals("fs.adl.oauth2.client.id", AZURE_AD_CLIENT_ID_KEY);
|
||||
Assert.assertEquals("fs.adl.oauth2.refresh.token",
|
||||
assertEquals("fs.adl.oauth2.client.id", AZURE_AD_CLIENT_ID_KEY);
|
||||
assertEquals("fs.adl.oauth2.refresh.token",
|
||||
AZURE_AD_REFRESH_TOKEN_KEY);
|
||||
Assert
|
||||
.assertEquals("fs.adl.oauth2.credential", AZURE_AD_CLIENT_SECRET_KEY);
|
||||
Assert.assertEquals("adl.debug.override.localuserasfileowner",
|
||||
assertEquals("fs.adl.oauth2.credential", AZURE_AD_CLIENT_SECRET_KEY);
|
||||
assertEquals("adl.debug.override.localuserasfileowner",
|
||||
ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER);
|
||||
|
||||
Assert.assertEquals("fs.adl.oauth2.access.token.provider.type",
|
||||
assertEquals("fs.adl.oauth2.access.token.provider.type",
|
||||
AZURE_AD_TOKEN_PROVIDER_TYPE_KEY);
|
||||
|
||||
Assert.assertEquals("adl.feature.client.cache.readahead",
|
||||
assertEquals("adl.feature.client.cache.readahead",
|
||||
READ_AHEAD_BUFFER_SIZE_KEY);
|
||||
|
||||
Assert.assertEquals("adl.feature.client.cache.drop.behind.writes",
|
||||
assertEquals("adl.feature.client.cache.drop.behind.writes",
|
||||
WRITE_BUFFER_SIZE_KEY);
|
||||
|
||||
Assert.assertEquals("RefreshToken", TOKEN_PROVIDER_TYPE_REFRESH_TOKEN);
|
||||
assertEquals("RefreshToken", TOKEN_PROVIDER_TYPE_REFRESH_TOKEN);
|
||||
|
||||
Assert.assertEquals("ClientCredential", TOKEN_PROVIDER_TYPE_CLIENT_CRED);
|
||||
assertEquals("ClientCredential", TOKEN_PROVIDER_TYPE_CLIENT_CRED);
|
||||
|
||||
Assert.assertEquals("adl.enable.client.latency.tracker",
|
||||
assertEquals("adl.enable.client.latency.tracker",
|
||||
LATENCY_TRACKER_KEY);
|
||||
|
||||
Assert.assertEquals(true, LATENCY_TRACKER_DEFAULT);
|
||||
assertEquals(true, LATENCY_TRACKER_DEFAULT);
|
||||
|
||||
Assert.assertEquals(true, ADL_EXPERIMENT_POSITIONAL_READ_DEFAULT);
|
||||
assertEquals(true, ADL_EXPERIMENT_POSITIONAL_READ_DEFAULT);
|
||||
|
||||
Assert.assertEquals("adl.feature.experiment.positional.read.enable",
|
||||
assertEquals("adl.feature.experiment.positional.read.enable",
|
||||
ADL_EXPERIMENT_POSITIONAL_READ_KEY);
|
||||
|
||||
Assert.assertEquals(1, ADL_REPLICATION_FACTOR);
|
||||
Assert.assertEquals(256 * 1024 * 1024, ADL_BLOCK_SIZE);
|
||||
Assert.assertEquals(false, ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT);
|
||||
Assert.assertEquals(4 * 1024 * 1024, DEFAULT_READ_AHEAD_BUFFER_SIZE);
|
||||
Assert.assertEquals(4 * 1024 * 1024, DEFAULT_WRITE_AHEAD_BUFFER_SIZE);
|
||||
assertEquals(1, ADL_REPLICATION_FACTOR);
|
||||
assertEquals(256 * 1024 * 1024, ADL_BLOCK_SIZE);
|
||||
assertEquals(false, ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT);
|
||||
assertEquals(4 * 1024 * 1024, DEFAULT_READ_AHEAD_BUFFER_SIZE);
|
||||
assertEquals(4 * 1024 * 1024, DEFAULT_WRITE_AHEAD_BUFFER_SIZE);
|
||||
|
||||
Assert.assertEquals("adl.feature.ownerandgroup.enableupn",
|
||||
assertEquals("adl.feature.ownerandgroup.enableupn",
|
||||
ADL_ENABLEUPN_FOR_OWNERGROUP_KEY);
|
||||
Assert.assertEquals(false,
|
||||
assertEquals(false,
|
||||
ADL_ENABLEUPN_FOR_OWNERGROUP_DEFAULT);
|
||||
}
|
||||
|
||||
|
@ -152,6 +151,95 @@ public class TestValidateConfiguration {
|
|||
assertDeprecatedKeys(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAccountNameFromFQDN() {
|
||||
assertEquals("dummy", AdlFileSystem.
|
||||
getAccountNameFromFQDN("dummy.azuredatalakestore.net"));
|
||||
assertEquals("localhost", AdlFileSystem.
|
||||
getAccountNameFromFQDN("localhost"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPropagateAccountOptionsDefault() {
|
||||
Configuration conf = new Configuration(false);
|
||||
conf.set("fs.adl.oauth2.client.id", "defaultClientId");
|
||||
conf.set("fs.adl.oauth2.credential", "defaultCredential");
|
||||
conf.set("some.other.config", "someValue");
|
||||
Configuration propagatedConf =
|
||||
AdlFileSystem.propagateAccountOptions(conf, "dummy");
|
||||
assertEquals("defaultClientId",
|
||||
propagatedConf.get(AZURE_AD_CLIENT_ID_KEY));
|
||||
assertEquals("defaultCredential",
|
||||
propagatedConf.get(AZURE_AD_CLIENT_SECRET_KEY));
|
||||
assertEquals("someValue",
|
||||
propagatedConf.get("some.other.config"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPropagateAccountOptionsSpecified() {
|
||||
Configuration conf = new Configuration(false);
|
||||
conf.set("fs.adl.account.dummy.oauth2.client.id", "dummyClientId");
|
||||
conf.set("fs.adl.account.dummy.oauth2.credential", "dummyCredential");
|
||||
conf.set("some.other.config", "someValue");
|
||||
|
||||
Configuration propagatedConf =
|
||||
AdlFileSystem.propagateAccountOptions(conf, "dummy");
|
||||
assertEquals("dummyClientId",
|
||||
propagatedConf.get(AZURE_AD_CLIENT_ID_KEY));
|
||||
assertEquals("dummyCredential",
|
||||
propagatedConf.get(AZURE_AD_CLIENT_SECRET_KEY));
|
||||
assertEquals("someValue",
|
||||
propagatedConf.get("some.other.config"));
|
||||
|
||||
propagatedConf =
|
||||
AdlFileSystem.propagateAccountOptions(conf, "anotherDummy");
|
||||
assertEquals(null,
|
||||
propagatedConf.get(AZURE_AD_CLIENT_ID_KEY));
|
||||
assertEquals(null,
|
||||
propagatedConf.get(AZURE_AD_CLIENT_SECRET_KEY));
|
||||
assertEquals("someValue",
|
||||
propagatedConf.get("some.other.config"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPropagateAccountOptionsAll() {
|
||||
Configuration conf = new Configuration(false);
|
||||
conf.set("fs.adl.oauth2.client.id", "defaultClientId");
|
||||
conf.set("fs.adl.oauth2.credential", "defaultCredential");
|
||||
conf.set("some.other.config", "someValue");
|
||||
conf.set("fs.adl.account.dummy1.oauth2.client.id", "dummyClientId1");
|
||||
conf.set("fs.adl.account.dummy1.oauth2.credential", "dummyCredential1");
|
||||
conf.set("fs.adl.account.dummy2.oauth2.client.id", "dummyClientId2");
|
||||
conf.set("fs.adl.account.dummy2.oauth2.credential", "dummyCredential2");
|
||||
|
||||
Configuration propagatedConf =
|
||||
AdlFileSystem.propagateAccountOptions(conf, "dummy1");
|
||||
assertEquals("dummyClientId1",
|
||||
propagatedConf.get(AZURE_AD_CLIENT_ID_KEY));
|
||||
assertEquals("dummyCredential1",
|
||||
propagatedConf.get(AZURE_AD_CLIENT_SECRET_KEY));
|
||||
assertEquals("someValue",
|
||||
propagatedConf.get("some.other.config"));
|
||||
|
||||
propagatedConf =
|
||||
AdlFileSystem.propagateAccountOptions(conf, "dummy2");
|
||||
assertEquals("dummyClientId2",
|
||||
propagatedConf.get(AZURE_AD_CLIENT_ID_KEY));
|
||||
assertEquals("dummyCredential2",
|
||||
propagatedConf.get(AZURE_AD_CLIENT_SECRET_KEY));
|
||||
assertEquals("someValue",
|
||||
propagatedConf.get("some.other.config"));
|
||||
|
||||
propagatedConf =
|
||||
AdlFileSystem.propagateAccountOptions(conf, "anotherDummy");
|
||||
assertEquals("defaultClientId",
|
||||
propagatedConf.get(AZURE_AD_CLIENT_ID_KEY));
|
||||
assertEquals("defaultCredential",
|
||||
propagatedConf.get(AZURE_AD_CLIENT_SECRET_KEY));
|
||||
assertEquals("someValue",
|
||||
propagatedConf.get("some.other.config"));
|
||||
}
|
||||
|
||||
private void setDeprecatedKeys(Configuration conf) {
|
||||
conf.set("dfs.adls.oauth2.access.token.provider.type", "dummyType");
|
||||
conf.set("dfs.adls.oauth2.client.id", "dummyClientId");
|
||||
|
@ -163,19 +251,19 @@ public class TestValidateConfiguration {
|
|||
}
|
||||
|
||||
private void assertDeprecatedKeys(Configuration conf) {
|
||||
Assert.assertEquals("dummyType",
|
||||
assertEquals("dummyType",
|
||||
conf.get(AZURE_AD_TOKEN_PROVIDER_TYPE_KEY));
|
||||
Assert.assertEquals("dummyClientId",
|
||||
assertEquals("dummyClientId",
|
||||
conf.get(AZURE_AD_CLIENT_ID_KEY));
|
||||
Assert.assertEquals("dummyRefreshToken",
|
||||
assertEquals("dummyRefreshToken",
|
||||
conf.get(AZURE_AD_REFRESH_TOKEN_KEY));
|
||||
Assert.assertEquals("dummyRefreshUrl",
|
||||
assertEquals("dummyRefreshUrl",
|
||||
conf.get(AZURE_AD_REFRESH_URL_KEY));
|
||||
Assert.assertEquals("dummyCredential",
|
||||
assertEquals("dummyCredential",
|
||||
conf.get(AZURE_AD_CLIENT_SECRET_KEY));
|
||||
Assert.assertEquals("dummyClass",
|
||||
assertEquals("dummyClass",
|
||||
conf.get(AZURE_AD_TOKEN_PROVIDER_CLASS_KEY));
|
||||
Assert.assertEquals("dummyTracker",
|
||||
assertEquals("dummyTracker",
|
||||
conf.get(LATENCY_TRACKER_KEY));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,7 +39,7 @@ public class Parallelized extends Parameterized {
|
|||
private static class ThreadPoolScheduler implements RunnerScheduler {
|
||||
private ExecutorService executor;
|
||||
|
||||
public ThreadPoolScheduler() {
|
||||
ThreadPoolScheduler() {
|
||||
int numThreads = 10;
|
||||
executor = Executors.newFixedThreadPool(numThreads);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue