HADOOP-15671. AliyunOSS: Support Assume Roles in AliyunOSS. Contributed by Jinhu Wu.

(cherry picked from commit 2b635125fb)
(cherry picked from commit 5da3e83597)
This commit is contained in:
Sammi Chen 2018-09-25 19:48:30 +08:00
parent 2449795b8e
commit 85e00477b8
7 changed files with 248 additions and 12 deletions

View File

@ -120,7 +120,8 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
if (null == partETags) {
throw new IOException("Failed to multipart upload to oss, abort it.");
}
store.completeMultipartUpload(key, uploadId, partETags);
store.completeMultipartUpload(key, uploadId,
new ArrayList<>(partETags));
}
} finally {
removePartFiles();
@ -129,7 +130,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
}
@Override
public void write(int b) throws IOException {
public synchronized void write(int b) throws IOException {
singleByte[0] = (byte)b;
write(singleByte, 0, 1);
}

View File

@ -149,7 +149,7 @@ public class AliyunOSSFileSystemStore {
"null or empty. Please set proper endpoint with 'fs.oss.endpoint'.");
}
CredentialsProvider provider =
AliyunOSSUtils.getCredentialsProvider(conf);
AliyunOSSUtils.getCredentialsProvider(uri, conf);
ossClient = new OSSClient(endPoint, provider, clientConf);
uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(conf,
MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
@ -168,6 +168,8 @@ public class AliyunOSSFileSystemStore {
multipartThreshold = 1024 * 1024 * 1024;
}
bucketName = uri.getHost();
String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
if (StringUtils.isNotEmpty(cannedACLName)) {
CannedAccessControlList cannedACL =
@ -176,7 +178,6 @@ public class AliyunOSSFileSystemStore {
}
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
bucketName = uri.getHost();
}
/**

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.fs.aliyun.oss;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.google.common.base.Preconditions;
@ -95,13 +96,14 @@ final public class AliyunOSSUtils {
* Create credential provider specified by configuration, or create default
* credential provider if not specified.
*
* @param uri uri passed by caller
* @param conf configuration
* @return a credential provider
* @throws IOException on any problem. Class construction issues may be
* nested inside the IOE.
*/
public static CredentialsProvider getCredentialsProvider(Configuration conf)
throws IOException {
public static CredentialsProvider getCredentialsProvider(
URI uri, Configuration conf) throws IOException {
CredentialsProvider credentials;
String className = conf.getTrimmed(CREDENTIALS_PROVIDER_KEY);
@ -117,7 +119,7 @@ final public class AliyunOSSUtils {
try {
credentials =
(CredentialsProvider)credClass.getDeclaredConstructor(
Configuration.class).newInstance(conf);
URI.class, Configuration.class).newInstance(uri, conf);
} catch (NoSuchMethodException | SecurityException e) {
credentials =
(CredentialsProvider)credClass.getDeclaredConstructor()

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import com.aliyun.oss.common.auth.Credentials;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.aliyun.oss.common.auth.InvalidCredentialsException;
import com.aliyun.oss.common.auth.STSAssumeRoleSessionCredentialsProvider;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.profile.DefaultProfile;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_ID;
import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_SECRET;
/**
* Support assumed role credentials for authenticating with Aliyun.
* roleArn is configured in core-site.xml
*/
public class AssumedRoleCredentialProvider implements CredentialsProvider {
private static final Logger LOG =
LoggerFactory.getLogger(AssumedRoleCredentialProvider.class);
public static final String NAME
= "org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider";
private Credentials credentials;
private String roleArn;
private long duration;
private String stsEndpoint;
private String sessionName;
private double expiredFactor;
private STSAssumeRoleSessionCredentialsProvider stsCredentialsProvider;
public AssumedRoleCredentialProvider(URI uri, Configuration conf) {
roleArn = conf.getTrimmed(Constants.ROLE_ARN, "");
if (StringUtils.isEmpty(roleArn)) {
throw new InvalidCredentialsException(
"fs.oss.assumed.role.arn is empty");
}
duration = conf.getLong(Constants.ASSUMED_ROLE_DURATION,
Constants.ASSUMED_ROLE_DURATION_DEFAULT);
expiredFactor = conf.getDouble(Constants.ASSUMED_ROLE_STS_EXPIRED_FACTOR,
Constants.ASSUMED_ROLE_STS_EXPIRED_FACTOR_DEFAULT);
stsEndpoint = conf.getTrimmed(Constants.ASSUMED_ROLE_STS_ENDPOINT, "");
if (StringUtils.isEmpty(stsEndpoint)) {
throw new InvalidCredentialsException(
"fs.oss.assumed.role.sts.endpoint is empty");
}
sessionName = conf.getTrimmed(Constants.ASSUMED_ROLE_SESSION_NAME, "");
String accessKeyId;
String accessKeySecret;
try {
accessKeyId = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_ID);
accessKeySecret = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_SECRET);
} catch (IOException e) {
throw new InvalidCredentialsException(e);
}
try {
DefaultProfile.addEndpoint("", "", "Sts", stsEndpoint);
} catch (ClientException e) {
throw new InvalidCredentialsException(e);
}
stsCredentialsProvider = new STSAssumeRoleSessionCredentialsProvider(
new com.aliyuncs.auth.BasicCredentials(accessKeyId, accessKeySecret),
roleArn, DefaultProfile.getProfile("", accessKeyId, accessKeySecret))
.withExpiredDuration(duration).withExpiredFactor(expiredFactor);
if (!StringUtils.isEmpty(sessionName)) {
stsCredentialsProvider.withRoleSessionName(sessionName);
}
}
@Override
public void setCredentials(Credentials creds) {
throw new InvalidCredentialsException(
"Should not set credentials from external call");
}
@Override
public Credentials getCredentials() {
credentials = stsCredentialsProvider.getCredentials();
if (credentials == null) {
throw new InvalidCredentialsException("Invalid credentials");
}
return credentials;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.aliyun.oss;
import com.aliyun.oss.common.utils.AuthUtils;
import com.aliyun.oss.common.utils.VersionInfoUtils;
/**
@ -42,6 +43,27 @@ public final class Constants {
public static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
public static final String SECURITY_TOKEN = "fs.oss.securityToken";
// Assume role configurations
public static final String ROLE_ARN = "fs.oss.assumed.role.arn";
public static final String ASSUMED_ROLE_DURATION =
"fs.oss.assumed.role.session.duration";
// Default session duration(in seconds)
public static final long ASSUMED_ROLE_DURATION_DEFAULT = 30 * 60;
// Expired factor of sts token
// For example, if session duration is 900s and expiredFactor is 0.8
// sts token will be refreshed after 900 * 0.8s
public static final String ASSUMED_ROLE_STS_EXPIRED_FACTOR =
"fs.oss.assumed.role.sts.expiredFactor";
public static final double ASSUMED_ROLE_STS_EXPIRED_FACTOR_DEFAULT =
AuthUtils.DEFAULT_EXPIRED_FACTOR;
public static final String ASSUMED_ROLE_STS_ENDPOINT =
"fs.oss.assumed.role.sts.endpoint";
public static final String ASSUMED_ROLE_SESSION_NAME =
"fs.oss.assumed.role.session.name";
// Number of simultaneous connections to oss
public static final String MAXIMUM_CONNECTIONS_KEY =
"fs.oss.connection.maximum";

View File

@ -117,6 +117,56 @@ please raise your issues with them.
</description>
</property>
<property>
<name>fs.oss.assumed.role.arn</name>
<description>
Role ARN for the role to be assumed.
Required if the fs.oss.credentials.provider is
org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
</description>
</property>
<property>
<name>fs.oss.assumed.role.sts.endpoint</name>
<description>
STS Token Service endpoint.
Required if the fs.oss.credentials.provider is
org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
</description>
</property>
<property>
<name>fs.oss.assumed.role.session.name</name>
<value />
<description>
Session name for the assumed role, must be valid characters
according to Aliyun API. It is optional, will be generated by
oss java sdk if it is empty.
Only used if the fs.oss.credentials.provider is
org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
</description>
</property>
<property>
<name>fs.oss.assumed.role.session.duration</name>
<value />
<description>
Duration of assumed roles before it is expired. Default is 30 minutes.
Only used if the fs.oss.credentials.provider is
org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
</description>
</property>
<property>
<name>fs.oss.assumed.role.sts.expiredFactor</name>
<value />
<description>
Sts token will be refreshed after (expiredFactor * duration) seconds.
Only used if the fs.oss.credentials.provider is
org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
</description>
</property>
<property>
<name>fs.oss.proxy.host</name>
<description>Hostname of the (optinal) proxy server for Aliyun OSS connection</description>

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs.aliyun.oss;
import com.aliyun.oss.common.auth.Credentials;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.aliyun.oss.common.auth.InvalidCredentialsException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.aliyun.oss.contract.AliyunOSSContract;
@ -27,9 +28,15 @@ import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.junit.Test;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_ID;
import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_SECRET;
import static org.apache.hadoop.fs.aliyun.oss.Constants.ASSUMED_ROLE_SESSION_NAME;
import static org.apache.hadoop.fs.aliyun.oss.Constants.ASSUMED_ROLE_STS_ENDPOINT;
import static org.apache.hadoop.fs.aliyun.oss.Constants.CREDENTIALS_PROVIDER_KEY;
import static org.apache.hadoop.fs.aliyun.oss.Constants.ROLE_ARN;
import static org.apache.hadoop.fs.aliyun.oss.Constants.SECURITY_TOKEN;
/**
@ -63,16 +70,54 @@ public class TestAliyunCredentials extends AbstractFSContractTestBase {
validateCredential(conf);
}
private void validateCredential(Configuration conf) {
@Test
public void testCredentialMissingRoleArn() throws Throwable {
Configuration conf = new Configuration();
conf.set(CREDENTIALS_PROVIDER_KEY, AssumedRoleCredentialProvider.NAME);
conf.set(ROLE_ARN, "");
validateCredential(conf);
}
@Test
public void testCredentialMissingStsEndpoint() throws Throwable {
Configuration conf = new Configuration();
conf.set(CREDENTIALS_PROVIDER_KEY, AssumedRoleCredentialProvider.NAME);
conf.set(ASSUMED_ROLE_STS_ENDPOINT, "");
validateCredential(conf);
}
@Test
public void testCredentialInvalidSessionName() throws Throwable {
Configuration conf = new Configuration();
conf.set(CREDENTIALS_PROVIDER_KEY, AssumedRoleCredentialProvider.NAME);
conf.set(ASSUMED_ROLE_SESSION_NAME, "hadoop oss");
validateCredential(conf);
}
private void validateCredential(URI uri, Configuration conf) {
try {
AliyunCredentialsProvider provider
= new AliyunCredentialsProvider(conf);
CredentialsProvider provider =
AliyunOSSUtils.getCredentialsProvider(uri, conf);
Credentials credentials = provider.getCredentials();
fail("Expected a CredentialInitializationException, got " + credentials);
} catch (InvalidCredentialsException expected) {
// expected
} catch (IOException e) {
fail("Unexpected exception.");
Throwable cause = e.getCause();
if (cause instanceof InvocationTargetException) {
boolean isInstance =
((InvocationTargetException)cause).getTargetException()
instanceof InvalidCredentialsException;
if (!isInstance) {
fail("Unexpected exception.");
}
} else {
fail("Unexpected exception.");
}
}
}
private void validateCredential(Configuration conf) {
validateCredential(null, conf);
}
}