HADOOP-15671. AliyunOSS: Support Assume Roles in AliyunOSS. Contributed by Jinhu Wu.
(cherry picked from commit2b635125fb
) (cherry picked from commit5da3e83597
) (cherry picked from commit85e00477b8
)
This commit is contained in:
parent
6937925838
commit
c617dba497
|
@ -124,7 +124,8 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
|
|||
if (null == partETags) {
|
||||
throw new IOException("Failed to multipart upload to oss, abort it.");
|
||||
}
|
||||
store.completeMultipartUpload(key, uploadId, partETags);
|
||||
store.completeMultipartUpload(key, uploadId,
|
||||
new ArrayList<>(partETags));
|
||||
}
|
||||
} finally {
|
||||
removePartFiles();
|
||||
|
@ -133,7 +134,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void write(int b) throws IOException {
|
||||
public synchronized void write(int b) throws IOException {
|
||||
singleByte[0] = (byte)b;
|
||||
write(singleByte, 0, 1);
|
||||
}
|
||||
|
|
|
@ -149,7 +149,7 @@ public class AliyunOSSFileSystemStore {
|
|||
"null or empty. Please set proper endpoint with 'fs.oss.endpoint'.");
|
||||
}
|
||||
CredentialsProvider provider =
|
||||
AliyunOSSUtils.getCredentialsProvider(conf);
|
||||
AliyunOSSUtils.getCredentialsProvider(uri, conf);
|
||||
ossClient = new OSSClient(endPoint, provider, clientConf);
|
||||
uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(conf,
|
||||
MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
|
||||
|
@ -168,6 +168,8 @@ public class AliyunOSSFileSystemStore {
|
|||
multipartThreshold = 1024 * 1024 * 1024;
|
||||
}
|
||||
|
||||
bucketName = uri.getHost();
|
||||
|
||||
String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
|
||||
if (StringUtils.isNotEmpty(cannedACLName)) {
|
||||
CannedAccessControlList cannedACL =
|
||||
|
@ -176,7 +178,6 @@ public class AliyunOSSFileSystemStore {
|
|||
}
|
||||
|
||||
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
|
||||
bucketName = uri.getHost();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.fs.aliyun.oss;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import com.aliyun.oss.common.auth.CredentialsProvider;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -95,13 +96,14 @@ final public class AliyunOSSUtils {
|
|||
* Create credential provider specified by configuration, or create default
|
||||
* credential provider if not specified.
|
||||
*
|
||||
* @param uri uri passed by caller
|
||||
* @param conf configuration
|
||||
* @return a credential provider
|
||||
* @throws IOException on any problem. Class construction issues may be
|
||||
* nested inside the IOE.
|
||||
*/
|
||||
public static CredentialsProvider getCredentialsProvider(Configuration conf)
|
||||
throws IOException {
|
||||
public static CredentialsProvider getCredentialsProvider(
|
||||
URI uri, Configuration conf) throws IOException {
|
||||
CredentialsProvider credentials;
|
||||
|
||||
String className = conf.getTrimmed(CREDENTIALS_PROVIDER_KEY);
|
||||
|
@ -117,7 +119,7 @@ final public class AliyunOSSUtils {
|
|||
try {
|
||||
credentials =
|
||||
(CredentialsProvider)credClass.getDeclaredConstructor(
|
||||
Configuration.class).newInstance(conf);
|
||||
URI.class, Configuration.class).newInstance(uri, conf);
|
||||
} catch (NoSuchMethodException | SecurityException e) {
|
||||
credentials =
|
||||
(CredentialsProvider)credClass.getDeclaredConstructor()
|
||||
|
|
|
@ -0,0 +1,115 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.aliyun.oss;
|
||||
|
||||
import com.aliyun.oss.common.auth.Credentials;
|
||||
import com.aliyun.oss.common.auth.CredentialsProvider;
|
||||
import com.aliyun.oss.common.auth.InvalidCredentialsException;
|
||||
import com.aliyun.oss.common.auth.STSAssumeRoleSessionCredentialsProvider;
|
||||
import com.aliyuncs.exceptions.ClientException;
|
||||
import com.aliyuncs.profile.DefaultProfile;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_ID;
|
||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_SECRET;
|
||||
|
||||
/**
|
||||
* Support assumed role credentials for authenticating with Aliyun.
|
||||
* roleArn is configured in core-site.xml
|
||||
*/
|
||||
public class AssumedRoleCredentialProvider implements CredentialsProvider {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AssumedRoleCredentialProvider.class);
|
||||
public static final String NAME
|
||||
= "org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider";
|
||||
private Credentials credentials;
|
||||
private String roleArn;
|
||||
private long duration;
|
||||
private String stsEndpoint;
|
||||
private String sessionName;
|
||||
private double expiredFactor;
|
||||
private STSAssumeRoleSessionCredentialsProvider stsCredentialsProvider;
|
||||
|
||||
public AssumedRoleCredentialProvider(URI uri, Configuration conf) {
|
||||
roleArn = conf.getTrimmed(Constants.ROLE_ARN, "");
|
||||
if (StringUtils.isEmpty(roleArn)) {
|
||||
throw new InvalidCredentialsException(
|
||||
"fs.oss.assumed.role.arn is empty");
|
||||
}
|
||||
|
||||
duration = conf.getLong(Constants.ASSUMED_ROLE_DURATION,
|
||||
Constants.ASSUMED_ROLE_DURATION_DEFAULT);
|
||||
|
||||
expiredFactor = conf.getDouble(Constants.ASSUMED_ROLE_STS_EXPIRED_FACTOR,
|
||||
Constants.ASSUMED_ROLE_STS_EXPIRED_FACTOR_DEFAULT);
|
||||
|
||||
stsEndpoint = conf.getTrimmed(Constants.ASSUMED_ROLE_STS_ENDPOINT, "");
|
||||
if (StringUtils.isEmpty(stsEndpoint)) {
|
||||
throw new InvalidCredentialsException(
|
||||
"fs.oss.assumed.role.sts.endpoint is empty");
|
||||
}
|
||||
|
||||
sessionName = conf.getTrimmed(Constants.ASSUMED_ROLE_SESSION_NAME, "");
|
||||
|
||||
String accessKeyId;
|
||||
String accessKeySecret;
|
||||
try {
|
||||
accessKeyId = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_ID);
|
||||
accessKeySecret = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_SECRET);
|
||||
} catch (IOException e) {
|
||||
throw new InvalidCredentialsException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
DefaultProfile.addEndpoint("", "", "Sts", stsEndpoint);
|
||||
} catch (ClientException e) {
|
||||
throw new InvalidCredentialsException(e);
|
||||
}
|
||||
|
||||
stsCredentialsProvider = new STSAssumeRoleSessionCredentialsProvider(
|
||||
new com.aliyuncs.auth.BasicCredentials(accessKeyId, accessKeySecret),
|
||||
roleArn, DefaultProfile.getProfile("", accessKeyId, accessKeySecret))
|
||||
.withExpiredDuration(duration).withExpiredFactor(expiredFactor);
|
||||
|
||||
if (!StringUtils.isEmpty(sessionName)) {
|
||||
stsCredentialsProvider.withRoleSessionName(sessionName);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCredentials(Credentials creds) {
|
||||
throw new InvalidCredentialsException(
|
||||
"Should not set credentials from external call");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Credentials getCredentials() {
|
||||
credentials = stsCredentialsProvider.getCredentials();
|
||||
if (credentials == null) {
|
||||
throw new InvalidCredentialsException("Invalid credentials");
|
||||
}
|
||||
return credentials;
|
||||
}
|
||||
}
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.fs.aliyun.oss;
|
||||
|
||||
import com.aliyun.oss.common.utils.AuthUtils;
|
||||
import com.aliyun.oss.common.utils.VersionInfoUtils;
|
||||
|
||||
/**
|
||||
|
@ -42,6 +43,27 @@ public final class Constants {
|
|||
public static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
|
||||
public static final String SECURITY_TOKEN = "fs.oss.securityToken";
|
||||
|
||||
// Assume role configurations
|
||||
public static final String ROLE_ARN = "fs.oss.assumed.role.arn";
|
||||
public static final String ASSUMED_ROLE_DURATION =
|
||||
"fs.oss.assumed.role.session.duration";
|
||||
// Default session duration(in seconds)
|
||||
public static final long ASSUMED_ROLE_DURATION_DEFAULT = 30 * 60;
|
||||
|
||||
// Expired factor of sts token
|
||||
// For example, if session duration is 900s and expiredFactor is 0.8
|
||||
// sts token will be refreshed after 900 * 0.8s
|
||||
public static final String ASSUMED_ROLE_STS_EXPIRED_FACTOR =
|
||||
"fs.oss.assumed.role.sts.expiredFactor";
|
||||
|
||||
public static final double ASSUMED_ROLE_STS_EXPIRED_FACTOR_DEFAULT =
|
||||
AuthUtils.DEFAULT_EXPIRED_FACTOR;
|
||||
|
||||
public static final String ASSUMED_ROLE_STS_ENDPOINT =
|
||||
"fs.oss.assumed.role.sts.endpoint";
|
||||
public static final String ASSUMED_ROLE_SESSION_NAME =
|
||||
"fs.oss.assumed.role.session.name";
|
||||
|
||||
// Number of simultaneous connections to oss
|
||||
public static final String MAXIMUM_CONNECTIONS_KEY =
|
||||
"fs.oss.connection.maximum";
|
||||
|
|
|
@ -117,6 +117,56 @@ please raise your issues with them.
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.oss.assumed.role.arn</name>
|
||||
<description>
|
||||
Role ARN for the role to be assumed.
|
||||
Required if the fs.oss.credentials.provider is
|
||||
org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.oss.assumed.role.sts.endpoint</name>
|
||||
<description>
|
||||
STS Token Service endpoint.
|
||||
Required if the fs.oss.credentials.provider is
|
||||
org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.oss.assumed.role.session.name</name>
|
||||
<value />
|
||||
<description>
|
||||
Session name for the assumed role, must be valid characters
|
||||
according to Aliyun API. It is optional, will be generated by
|
||||
oss java sdk if it is empty.
|
||||
Only used if the fs.oss.credentials.provider is
|
||||
org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.oss.assumed.role.session.duration</name>
|
||||
<value />
|
||||
<description>
|
||||
Duration of assumed roles before it is expired. Default is 30 minutes.
|
||||
Only used if the fs.oss.credentials.provider is
|
||||
org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.oss.assumed.role.sts.expiredFactor</name>
|
||||
<value />
|
||||
<description>
|
||||
Sts token will be refreshed after (expiredFactor * duration) seconds.
|
||||
Only used if the fs.oss.credentials.provider is
|
||||
org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.oss.proxy.host</name>
|
||||
<description>Hostname of the (optinal) proxy server for Aliyun OSS connection</description>
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.fs.aliyun.oss;
|
||||
|
||||
import com.aliyun.oss.common.auth.Credentials;
|
||||
import com.aliyun.oss.common.auth.CredentialsProvider;
|
||||
import com.aliyun.oss.common.auth.InvalidCredentialsException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.aliyun.oss.contract.AliyunOSSContract;
|
||||
|
@ -27,9 +28,15 @@ import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.URI;
|
||||
|
||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_ID;
|
||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_SECRET;
|
||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.ASSUMED_ROLE_SESSION_NAME;
|
||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.ASSUMED_ROLE_STS_ENDPOINT;
|
||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.CREDENTIALS_PROVIDER_KEY;
|
||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.ROLE_ARN;
|
||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.SECURITY_TOKEN;
|
||||
|
||||
/**
|
||||
|
@ -63,16 +70,54 @@ public class TestAliyunCredentials extends AbstractFSContractTestBase {
|
|||
validateCredential(conf);
|
||||
}
|
||||
|
||||
private void validateCredential(Configuration conf) {
|
||||
@Test
|
||||
public void testCredentialMissingRoleArn() throws Throwable {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(CREDENTIALS_PROVIDER_KEY, AssumedRoleCredentialProvider.NAME);
|
||||
conf.set(ROLE_ARN, "");
|
||||
validateCredential(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCredentialMissingStsEndpoint() throws Throwable {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(CREDENTIALS_PROVIDER_KEY, AssumedRoleCredentialProvider.NAME);
|
||||
conf.set(ASSUMED_ROLE_STS_ENDPOINT, "");
|
||||
validateCredential(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCredentialInvalidSessionName() throws Throwable {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(CREDENTIALS_PROVIDER_KEY, AssumedRoleCredentialProvider.NAME);
|
||||
conf.set(ASSUMED_ROLE_SESSION_NAME, "hadoop oss");
|
||||
validateCredential(conf);
|
||||
}
|
||||
|
||||
private void validateCredential(URI uri, Configuration conf) {
|
||||
try {
|
||||
AliyunCredentialsProvider provider
|
||||
= new AliyunCredentialsProvider(conf);
|
||||
CredentialsProvider provider =
|
||||
AliyunOSSUtils.getCredentialsProvider(uri, conf);
|
||||
Credentials credentials = provider.getCredentials();
|
||||
fail("Expected a CredentialInitializationException, got " + credentials);
|
||||
} catch (InvalidCredentialsException expected) {
|
||||
// expected
|
||||
} catch (IOException e) {
|
||||
fail("Unexpected exception.");
|
||||
Throwable cause = e.getCause();
|
||||
if (cause instanceof InvocationTargetException) {
|
||||
boolean isInstance =
|
||||
((InvocationTargetException)cause).getTargetException()
|
||||
instanceof InvalidCredentialsException;
|
||||
if (!isInstance) {
|
||||
fail("Unexpected exception.");
|
||||
}
|
||||
} else {
|
||||
fail("Unexpected exception.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void validateCredential(Configuration conf) {
|
||||
validateCredential(null, conf);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue