HDFS-8155. Support OAuth2 in WebHDFS.

(cherry picked from commit 837fb75e8e)
This commit is contained in:
Jakob Homan 2015-08-29 18:37:05 -07:00
parent de42fce390
commit 94944ba4f6
20 changed files with 1369 additions and 3 deletions

View File

@ -31,6 +31,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.squareup.okhttp</groupId>
<artifactId>okhttp</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>

View File

@ -36,6 +36,14 @@ public interface HdfsClientConfigKeys {
String DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT =
"^(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?(,(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?)*$";
String DFS_WEBHDFS_OAUTH_ENABLED_KEY = "dfs.webhdfs.oauth2.enabled";
boolean DFS_WEBHDFS_OAUTH_ENABLED_DEFAULT = false;
String OAUTH_CLIENT_ID_KEY = "dfs.webhdfs.oauth2.client.id";
String OAUTH_REFRESH_URL_KEY = "dfs.webhdfs.oauth2.refresh.url";
String ACCESS_TOKEN_PROVIDER_KEY = "dfs.webhdfs.oauth2.access.token.provider";
String PREFIX = "dfs.client.";
String DFS_NAMESERVICES = "dfs.nameservices";
int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.web.oauth2.OAuth2ConnectionConfigurator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
@ -77,15 +78,42 @@ public HttpURLConnection configure(HttpURLConnection conn)
* try to load SSL certificates when it is specified.
*/
public static URLConnectionFactory newDefaultURLConnectionFactory(Configuration conf) {
ConnectionConfigurator conn = getSSLConnectionConfiguration(conf);
return new URLConnectionFactory(conn);
}
private static ConnectionConfigurator
getSSLConnectionConfiguration(Configuration conf) {
ConnectionConfigurator conn = null;
try {
conn = newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
} catch (Exception e) {
LOG.debug(
"Cannot load customized ssl related configuration. Fallback to system-generic settings.",
"Cannot load customized ssl related configuration. Fallback to" +
" system-generic settings.",
e);
conn = DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
}
return conn;
}
/**
* Construct a new URLConnectionFactory that supports OAut-based connections.
* It will also try to load the SSL configuration when they are specified.
*/
public static URLConnectionFactory
newOAuth2URLConnectionFactory(Configuration conf) throws IOException {
ConnectionConfigurator conn = null;
try {
ConnectionConfigurator sslConnConfigurator
= newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
conn = new OAuth2ConnectionConfigurator(conf, sslConnConfigurator);
} catch (Exception e) {
throw new IOException("Unable to load OAuth2 connection factory.", e);
}
return new URLConnectionFactory(conn);
}

View File

@ -149,8 +149,19 @@ public synchronized void initialize(URI uri, Configuration conf
HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf);
boolean isOAuth = conf.getBoolean(
HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_KEY,
HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_DEFAULT);
if(isOAuth) {
LOG.info("Enabling OAuth2 in WebHDFS");
connectionFactory = URLConnectionFactory
.newOAuth2URLConnectionFactory(conf);
} else {
LOG.info("Not enabling OAuth2 in WebHDFS");
connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf);
}
ugi = UserGroupInformation.getCurrentUser();
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hdfs.web.oauth2;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
/**
* Provide an OAuth2 access token to be used to authenticate http calls in
* WebHDFS.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class AccessTokenProvider implements Configurable {
private Configuration conf;
/**
* Obtain the access token that should be added to http connection's header.
* Will be called for each connection, so implementations should be
* performant. Implementations are responsible for any refreshing of
* the token.
*
* @return Access token to be added to connection header.
*/
abstract String getAccessToken() throws IOException;
/**
* Return the conf.
*
* @return the conf.
*/
@Override
public Configuration getConf() {
return conf;
}
/**
* Set the conf.
*
* @param configuration New configuration.
*/
@Override
public void setConf(Configuration configuration) {
this.conf = configuration;
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hdfs.web.oauth2;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.Timer;
/**
* Access tokens generally expire. This timer helps keep track of that.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class AccessTokenTimer {
public static final long EXPIRE_BUFFER_MS = 30 * 1000L;
private final Timer timer;
/**
* When the current access token will expire in milliseconds since
* epoch.
*/
private long nextRefreshMSSinceEpoch;
public AccessTokenTimer() {
this(new Timer());
}
/**
*
* @param timer Timer instance for unit testing
*/
public AccessTokenTimer(Timer timer) {
this.timer = timer;
this.nextRefreshMSSinceEpoch = 0;
}
/**
* Set when the access token will expire as reported by the oauth server,
* ie in seconds from now.
* @param expiresIn Access time expiration as reported by OAuth server
*/
public void setExpiresIn(String expiresIn) {
this.nextRefreshMSSinceEpoch = convertExpiresIn(timer, expiresIn);
}
/**
* Set when the access token will expire in milliseconds from epoch,
* as required by the WebHDFS configuration. This is a bit hacky and lame.
*
* @param expiresInMSSinceEpoch Access time expiration in ms since epoch.
*/
public void setExpiresInMSSinceEpoch(String expiresInMSSinceEpoch){
this.nextRefreshMSSinceEpoch = Long.parseLong(expiresInMSSinceEpoch);
}
/**
* Get next time we should refresh the token.
*
* @return Next time since epoch we'll need to refresh the token.
*/
public long getNextRefreshMSSinceEpoch() {
return nextRefreshMSSinceEpoch;
}
/**
* Return true if the current token has expired or will expire within the
* EXPIRE_BUFFER_MS (to give ample wiggle room for the call to be made to
* the server).
*/
public boolean shouldRefresh() {
long lowerLimit = nextRefreshMSSinceEpoch - EXPIRE_BUFFER_MS;
long currTime = timer.now();
return currTime > lowerLimit;
}
/**
* The expires_in param from OAuth is in seconds-from-now. Convert to
* milliseconds-from-epoch
*/
static Long convertExpiresIn(Timer timer, String expiresInSecs) {
long expiresSecs = Long.parseLong(expiresInSecs);
long expiresMs = expiresSecs * 1000;
return timer.now() + expiresMs;
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hdfs.web.oauth2;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Timer;
import static org.apache.hadoop.hdfs.web.oauth2.Utils.notNull;
/**
* Obtain an access token via a a credential (provided through the
* Configuration) using the
* <a href="https://tools.ietf.org/html/rfc6749#section-4.4">
* Client Credentials Grant workflow</a>.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ConfCredentialBasedAccessTokenProvider
extends CredentialBasedAccessTokenProvider {
private String credential;
public ConfCredentialBasedAccessTokenProvider() {
}
public ConfCredentialBasedAccessTokenProvider(Timer timer) {
super(timer);
}
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
credential = notNull(conf, OAUTH_CREDENTIAL_KEY);
}
@Override
public String getCredential() {
if(credential == null) {
throw new IllegalArgumentException("Credential has not been " +
"provided in configuration");
}
return credential;
}
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hdfs.web.oauth2;
import com.squareup.okhttp.OkHttpClient;
import com.squareup.okhttp.Request;
import com.squareup.okhttp.RequestBody;
import com.squareup.okhttp.Response;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.util.Timer;
import org.apache.http.HttpStatus;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.ACCESS_TOKEN;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_ID;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.EXPIRES_IN;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.GRANT_TYPE;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.REFRESH_TOKEN;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.URLENCODED;
import static org.apache.hadoop.hdfs.web.oauth2.Utils.notNull;
/**
* Supply a access token obtained via a refresh token (provided through the
* Configuration using the second half of the
* <a href="https://tools.ietf.org/html/rfc6749#section-4.1">
* Authorization Code Grant workflow</a>.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ConfRefreshTokenBasedAccessTokenProvider
extends AccessTokenProvider {
public static final String OAUTH_REFRESH_TOKEN_KEY
= "dfs.webhdfs.oauth2.refresh.token";
public static final String OAUTH_REFRESH_TOKEN_EXPIRES_KEY
= "dfs.webhdfs.oauth2.refresh.token.expires.ms.since.epoch";
private AccessTokenTimer accessTokenTimer;
private String accessToken;
private String refreshToken;
private String clientId;
private String refreshURL;
public ConfRefreshTokenBasedAccessTokenProvider() {
this.accessTokenTimer = new AccessTokenTimer();
}
public ConfRefreshTokenBasedAccessTokenProvider(Timer timer) {
this.accessTokenTimer = new AccessTokenTimer(timer);
}
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
refreshToken = notNull(conf, (OAUTH_REFRESH_TOKEN_KEY));
accessTokenTimer.setExpiresInMSSinceEpoch(
notNull(conf, OAUTH_REFRESH_TOKEN_EXPIRES_KEY));
clientId = notNull(conf, OAUTH_CLIENT_ID_KEY);
refreshURL = notNull(conf, OAUTH_REFRESH_URL_KEY);
}
@Override
public synchronized String getAccessToken() throws IOException {
if(accessTokenTimer.shouldRefresh()) {
refresh();
}
return accessToken;
}
void refresh() throws IOException {
try {
OkHttpClient client = new OkHttpClient();
client.setConnectTimeout(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
TimeUnit.MILLISECONDS);
client.setReadTimeout(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
TimeUnit.MILLISECONDS);
String bodyString = Utils.postBody(GRANT_TYPE, REFRESH_TOKEN,
REFRESH_TOKEN, refreshToken,
CLIENT_ID, clientId);
RequestBody body = RequestBody.create(URLENCODED, bodyString);
Request request = new Request.Builder()
.url(refreshURL)
.post(body)
.build();
Response responseBody = client.newCall(request).execute();
if (responseBody.code() != HttpStatus.SC_OK) {
throw new IllegalArgumentException("Received invalid http response: "
+ responseBody.code() + ", text = " + responseBody.toString());
}
ObjectMapper mapper = new ObjectMapper();
Map<?, ?> response = mapper.reader(Map.class)
.readValue(responseBody.body().string());
String newExpiresIn = response.get(EXPIRES_IN).toString();
accessTokenTimer.setExpiresIn(newExpiresIn);
accessToken = response.get(ACCESS_TOKEN).toString();
} catch (Exception e) {
throw new IOException("Exception while refreshing access token", e);
}
}
public String getRefreshToken() {
return refreshToken;
}
}

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hdfs.web.oauth2;
import com.squareup.okhttp.OkHttpClient;
import com.squareup.okhttp.Request;
import com.squareup.okhttp.RequestBody;
import com.squareup.okhttp.Response;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.util.Timer;
import org.apache.http.HttpStatus;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.ACCESS_TOKEN;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_CREDENTIALS;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_ID;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_SECRET;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.EXPIRES_IN;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.GRANT_TYPE;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.URLENCODED;
import static org.apache.hadoop.hdfs.web.oauth2.Utils.notNull;
/**
* Obtain an access token via the credential-based OAuth2 workflow. This
* abstract class requires only that implementations provide the credential,
* which the class then uses to obtain a refresh token.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class CredentialBasedAccessTokenProvider
extends AccessTokenProvider {
public static final String OAUTH_CREDENTIAL_KEY
= "dfs.webhdfs.oauth2.credential";
private AccessTokenTimer timer;
private String clientId;
private String refreshURL;
private String accessToken;
private boolean initialCredentialObtained = false;
CredentialBasedAccessTokenProvider() {
this.timer = new AccessTokenTimer();
}
CredentialBasedAccessTokenProvider(Timer timer) {
this.timer = new AccessTokenTimer(timer);
}
abstract String getCredential();
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
clientId = notNull(conf, OAUTH_CLIENT_ID_KEY);
refreshURL = notNull(conf, OAUTH_REFRESH_URL_KEY);
}
@Override
public synchronized String getAccessToken() throws IOException {
if(timer.shouldRefresh() || !initialCredentialObtained) {
refresh();
initialCredentialObtained = true;
}
return accessToken;
}
void refresh() throws IOException {
try {
OkHttpClient client = new OkHttpClient();
client.setConnectTimeout(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
TimeUnit.MILLISECONDS);
client.setReadTimeout(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
TimeUnit.MILLISECONDS);
String bodyString = Utils.postBody(CLIENT_SECRET, getCredential(),
GRANT_TYPE, CLIENT_CREDENTIALS,
CLIENT_ID, clientId);
RequestBody body = RequestBody.create(URLENCODED, bodyString);
Request request = new Request.Builder()
.url(refreshURL)
.post(body)
.build();
Response responseBody = client.newCall(request).execute();
if (responseBody.code() != HttpStatus.SC_OK) {
throw new IllegalArgumentException("Received invalid http response: "
+ responseBody.code() + ", text = " + responseBody.toString());
}
ObjectMapper mapper = new ObjectMapper();
Map<?, ?> response = mapper.reader(Map.class)
.readValue(responseBody.body().string());
String newExpiresIn = response.get(EXPIRES_IN).toString();
timer.setExpiresIn(newExpiresIn);
accessToken = response.get(ACCESS_TOKEN).toString();
} catch (Exception e) {
throw new IOException("Unable to obtain access token from credential", e);
}
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hdfs.web.oauth2;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
import java.net.HttpURLConnection;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ACCESS_TOKEN_PROVIDER_KEY;
import static org.apache.hadoop.hdfs.web.oauth2.Utils.notNull;
/**
* Configure a connection to use OAuth2 authentication.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class OAuth2ConnectionConfigurator implements ConnectionConfigurator {
public static final String HEADER = "Bearer ";
private final AccessTokenProvider accessTokenProvider;
private ConnectionConfigurator sslConfigurator = null;
public OAuth2ConnectionConfigurator(Configuration conf) {
this(conf, null);
}
@SuppressWarnings("unchecked")
public OAuth2ConnectionConfigurator(Configuration conf,
ConnectionConfigurator sslConfigurator) {
this.sslConfigurator = sslConfigurator;
notNull(conf, ACCESS_TOKEN_PROVIDER_KEY);
Class accessTokenProviderClass = conf.getClass(ACCESS_TOKEN_PROVIDER_KEY,
ConfCredentialBasedAccessTokenProvider.class,
AccessTokenProvider.class);
accessTokenProvider = (AccessTokenProvider) ReflectionUtils
.newInstance(accessTokenProviderClass, conf);
accessTokenProvider.setConf(conf);
}
@Override
public HttpURLConnection configure(HttpURLConnection conn)
throws IOException {
if(sslConfigurator != null) {
sslConfigurator.configure(conn);
}
String accessToken = accessTokenProvider.getAccessToken();
conn.setRequestProperty("AUTHORIZATION", HEADER + accessToken);
return conn;
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hdfs.web.oauth2;
import com.squareup.okhttp.MediaType;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Sundry constants relating to OAuth2 within WebHDFS.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class OAuth2Constants {
private OAuth2Constants() { /** Private constructor. **/ }
public static final MediaType URLENCODED
= MediaType.parse("application/x-www-form-urlencoded; charset=utf-8");
/* Constants for OAuth protocol */
public static final String ACCESS_TOKEN = "access_token";
public static final String BEARER = "bearer";
public static final String CLIENT_CREDENTIALS = "client_credentials";
public static final String CLIENT_ID = "client_id";
public static final String CLIENT_SECRET = "client_secret";
public static final String EXPIRES_IN = "expires_in";
public static final String GRANT_TYPE = "grant_type";
public static final String REFRESH_TOKEN = "refresh_token";
public static final String TOKEN_TYPE = "token_type";
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hdfs.web.oauth2;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
@InterfaceAudience.Private
@InterfaceStability.Evolving
final class Utils {
private Utils() { /* Private constructor */ }
public static String notNull(Configuration conf, String key) {
String value = conf.get(key);
if(value == null) {
throw new IllegalArgumentException("No value for " + key +
" found in conf file.");
}
return value;
}
public static String postBody(String ... kv)
throws UnsupportedEncodingException {
if(kv.length % 2 != 0) {
throw new IllegalArgumentException("Arguments must be key value pairs");
}
StringBuilder sb = new StringBuilder();
int i = 0;
while(i < kv.length) {
if(i > 0) {
sb.append("&");
}
sb.append(URLEncoder.encode(kv[i++], "UTF-8"));
sb.append("=");
sb.append(URLEncoder.encode(kv[i++], "UTF-8"));
}
return sb.toString();
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* OAuth2-based WebHDFS authentication.
*/
@InterfaceAudience.Public
package org.apache.hadoop.hdfs.web.oauth2;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -12,6 +12,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8131. Implement a space balanced block placement policy (Liu Shaohui
via kihwal)
HDFS-8155. Support OAuth2 in WebHDFS. (jghoman)
IMPROVEMENTS
HDFS-2390. dfsadmin -setBalancerBandwidth does not validate -ve value

View File

@ -212,6 +212,12 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>leveldbjni-all</artifactId>
<version>1.8</version>
</dependency>
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-netty</artifactId>
<version>3.9.2</version>
<scope>test</scope>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.bouncycastle</groupId>

View File

@ -222,6 +222,31 @@ Below are examples using the `curl` command tool.
See also: [Authentication for Hadoop HTTP web-consoles](../hadoop-common/HttpAuthentication.html)
Additionally, WebHDFS supports OAuth2 on the client side. The Namenode and Datanodes do not currently support clients using OAuth2 but other backends that implement the WebHDFS REST interface may.
WebHDFS supports two type of OAuth2 code grants (user-provided refresh and access token or user provided credential) by default and provides a pluggable mechanism for implementing other OAuth2 authentications per the [OAuth2 RFC](https://tools.ietf.org/html/rfc6749), or custom authentications. When using either of the provided code grant mechanisms, the WebHDFS client will refresh the access token as necessary.
OAuth2 should only be enabled for clients not running with Kerberos SPENGO.
| OAuth2 code grant mechanism | Description | Value of `dfs.webhdfs.oauth2.access.token.provider` that implements code grant |
|:---- |:---- |:----|
| Authorization Code Grant | The user provides an initial access token and refresh token, which are then used to authenticate WebHDFS requests and obtain replacement access tokens, respectively. | org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider |
| Client Credentials Grant | The user provides a credential which is used to obtain access tokens, which are then used to authenticate WebHDFS requests. | org.apache.hadoop.hdfs.web.oauth2.ConfCredentialBasedAccessTokenProvider |
The following properties control OAuth2 authentication.
| OAuth2 related property | Description |
|:---- |:---- |
| `dfs.webhdfs.oauth2.enabled` | Boolean to enable/disable OAuth2 authentication |
| `dfs.webhdfs.oauth2.access.token.provider` | Class name of an implementation of `org.apache.hadoop.hdfs.web.oauth.AccessTokenProvider.` Two are provided with the code, as described above, or the user may specify a user-provided implementation. The default value for this configuration key is the `ConfCredentialBasedAccessTokenProvider` implementation. |
| `dfs.webhdfs.oauth2.client.id` | Client id used to obtain access token with either credential or refresh token |
| `dfs.webhdfs.oauth2.refresh.url` | URL against which to post for obtaining bearer token with either credential or refresh token |
| `dfs.webhdfs.oauth2.access.token` | (required if using ConfRefreshTokenBasedAccessTokenProvider) Initial access token with which to authenticate |
| `dfs.webhdfs.oauth2.refresh.token` | (required if using ConfRefreshTokenBasedAccessTokenProvider) Initial refresh token to use to obtain new access tokens |
| `dfs.webhdfs.oauth2.refresh.token.expires.ms.since.epoch` | (required if using ConfRefreshTokenBasedAccessTokenProvider) Access token expiration measured in milliseconds since Jan 1, 1970. *Note this is a different value than provided by OAuth providers and has been munged as described in interface to be suitable for a client application* |
| `dfs.webhdfs.oauth2.credential` | (required if using ConfCredentialBasedAccessTokenProvider). Credential used to obtain initial and subsequent access tokens. |
Proxy Users
-----------

View File

@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hdfs.web;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.web.oauth2.ConfCredentialBasedAccessTokenProvider;
import org.apache.hadoop.hdfs.web.oauth2.CredentialBasedAccessTokenProvider;
import org.apache.hadoop.hdfs.web.oauth2.OAuth2ConnectionConfigurator;
import org.apache.http.HttpStatus;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockserver.client.server.MockServerClient;
import org.mockserver.integration.ClientAndServer;
import org.mockserver.model.Header;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.TreeMap;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ACCESS_TOKEN_PROVIDER_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.ACCESS_TOKEN;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.EXPIRES_IN;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.TOKEN_TYPE;
import static org.junit.Assert.assertEquals;
import static org.mockserver.integration.ClientAndServer.startClientAndServer;
import static org.mockserver.matchers.Times.exactly;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.response;
public class TestWebHDFSOAuth2 {
public static final Log LOG = LogFactory.getLog(TestWebHDFSOAuth2.class);
private ClientAndServer mockWebHDFS;
private ClientAndServer mockOAuthServer;
public final static int WEBHDFS_PORT = 7552;
public final static int OAUTH_PORT = 7553;
public final static Header CONTENT_TYPE_APPLICATION_JSON = new Header("Content-Type", "application/json");
public final static String AUTH_TOKEN = "0123456789abcdef";
public final static Header AUTH_TOKEN_HEADER = new Header("AUTHORIZATION", OAuth2ConnectionConfigurator.HEADER + AUTH_TOKEN);
@Before
public void startMockOAuthServer() {
mockOAuthServer = startClientAndServer(OAUTH_PORT);
}
@Before
public void startMockWebHDFSServer() {
System.setProperty("hadoop.home.dir", System.getProperty("user.dir"));
mockWebHDFS = startClientAndServer(WEBHDFS_PORT);
}
@Test
public void listStatusReturnsAsExpected() throws URISyntaxException, IOException {
MockServerClient mockWebHDFSServerClient = new MockServerClient("localhost", WEBHDFS_PORT);
MockServerClient mockOAuthServerClient = new MockServerClient("localhost", OAUTH_PORT);
HttpRequest oauthServerRequest = getOAuthServerMockRequest(mockOAuthServerClient);
HttpRequest fileSystemRequest = request()
.withMethod("GET")
.withPath(WebHdfsFileSystem.PATH_PREFIX + "/test1/test2")
.withHeader(AUTH_TOKEN_HEADER);
try {
mockWebHDFSServerClient.when(fileSystemRequest,
exactly(1)
)
.respond(
response()
.withStatusCode(HttpStatus.SC_OK)
.withHeaders(
CONTENT_TYPE_APPLICATION_JSON
)
.withBody("{\n" +
" \"FileStatuses\":\n" +
" {\n" +
" \"FileStatus\":\n" +
" [\n" +
" {\n" +
" \"accessTime\" : 1320171722771,\n" +
" \"blockSize\" : 33554432,\n" +
" \"group\" : \"supergroup\",\n" +
" \"length\" : 24930,\n" +
" \"modificationTime\": 1320171722771,\n" +
" \"owner\" : \"webuser\",\n" +
" \"pathSuffix\" : \"a.patch\",\n" +
" \"permission\" : \"644\",\n" +
" \"replication\" : 1,\n" +
" \"type\" : \"FILE\"\n" +
" },\n" +
" {\n" +
" \"accessTime\" : 0,\n" +
" \"blockSize\" : 0,\n" +
" \"group\" : \"supergroup\",\n" +
" \"length\" : 0,\n" +
" \"modificationTime\": 1320895981256,\n" +
" \"owner\" : \"szetszwo\",\n" +
" \"pathSuffix\" : \"bar\",\n" +
" \"permission\" : \"711\",\n" +
" \"replication\" : 0,\n" +
" \"type\" : \"DIRECTORY\"\n" +
" }\n" +
" ]\n" +
" }\n" +
"}\n")
);
FileSystem fs = new WebHdfsFileSystem();
Configuration conf = getConfiguration();
conf.set(OAUTH_REFRESH_URL_KEY, "http://localhost:" + OAUTH_PORT + "/refresh");
conf.set(CredentialBasedAccessTokenProvider.OAUTH_CREDENTIAL_KEY, "credential");
URI uri = new URI("webhdfs://localhost:" + WEBHDFS_PORT);
fs.initialize(uri, conf);
FileStatus[] ls = fs.listStatus(new Path("/test1/test2"));
mockOAuthServer.verify(oauthServerRequest);
mockWebHDFSServerClient.verify(fileSystemRequest);
assertEquals(2, ls.length);
assertEquals("a.patch", ls[0].getPath().getName());
assertEquals("bar", ls[1].getPath().getName());
fs.close();
} finally {
mockWebHDFSServerClient.clear(fileSystemRequest);
mockOAuthServerClient.clear(oauthServerRequest);
}
}
private HttpRequest getOAuthServerMockRequest(MockServerClient mockServerClient) throws IOException {
HttpRequest expectedRequest = request()
.withMethod("POST")
.withPath("/refresh")
.withBody("client_secret=credential&grant_type=client_credentials&client_id=MY_CLIENTID");
Map<String, Object> map = new TreeMap<>();
map.put(EXPIRES_IN, "0987654321");
map.put(TOKEN_TYPE, "bearer");
map.put(ACCESS_TOKEN, AUTH_TOKEN);
ObjectMapper mapper = new ObjectMapper();
HttpResponse resp = response()
.withStatusCode(HttpStatus.SC_OK)
.withHeaders(
CONTENT_TYPE_APPLICATION_JSON
)
.withBody(mapper.writeValueAsString(map));
mockServerClient
.when(expectedRequest, exactly(1))
.respond(resp);
return expectedRequest;
}
public Configuration getConfiguration() {
Configuration conf = new Configuration();
// Configs for OAuth2
conf.setBoolean(HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_KEY, true);
conf.set(OAUTH_CLIENT_ID_KEY, "MY_CLIENTID");
conf.set(ACCESS_TOKEN_PROVIDER_KEY,
ConfCredentialBasedAccessTokenProvider.class.getName());
return conf;
}
@After
public void stopMockWebHDFSServer() {
mockWebHDFS.stop();
}
@After
public void stopMockOAuthServer() {
mockOAuthServer.stop();
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hdfs.web.oauth2;
import org.apache.hadoop.util.Timer;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestAccessTokenTimer {
@Test
public void expireConversionWorks() {
Timer mockTimer = mock(Timer.class);
when(mockTimer.now())
.thenReturn(5l);
AccessTokenTimer timer = new AccessTokenTimer(mockTimer);
timer.setExpiresIn("3");
assertEquals(3005, timer.getNextRefreshMSSinceEpoch());
assertTrue(timer.shouldRefresh());
}
@Test
public void shouldRefreshIsCorrect() {
Timer mockTimer = mock(Timer.class);
when(mockTimer.now())
.thenReturn(500l)
.thenReturn(1000000l + 500l);
AccessTokenTimer timer = new AccessTokenTimer(mockTimer);
timer.setExpiresInMSSinceEpoch("1000000");
assertFalse(timer.shouldRefresh());
assertTrue(timer.shouldRefresh());
verify(mockTimer, times(2)).now();
}
}

View File

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hdfs.web.oauth2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Timer;
import org.apache.http.HttpStatus;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.Test;
import org.mockserver.client.server.MockServerClient;
import org.mockserver.integration.ClientAndServer;
import org.mockserver.model.Header;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;
import org.mockserver.model.Parameter;
import org.mockserver.model.ParameterBody;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ACCESS_TOKEN_PROVIDER_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.ACCESS_TOKEN;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_CREDENTIALS;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_ID;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_SECRET;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.EXPIRES_IN;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.GRANT_TYPE;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.TOKEN_TYPE;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockserver.integration.ClientAndServer.startClientAndServer;
import static org.mockserver.matchers.Times.exactly;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.response;
public class TestClientCredentialTimeBasedTokenRefresher {
public final static Header CONTENT_TYPE_APPLICATION_JSON
= new Header("Content-Type", "application/json");
public final static String CLIENT_ID_FOR_TESTING = "joebob";
public Configuration buildConf(String credential, String tokenExpires,
String clientId, String refreshURL) {
// Configurations are simple enough that it's not worth mocking them out.
Configuration conf = new Configuration();
conf.set(CredentialBasedAccessTokenProvider.OAUTH_CREDENTIAL_KEY,
credential);
conf.set(ACCESS_TOKEN_PROVIDER_KEY,
ConfCredentialBasedAccessTokenProvider.class.getName());
conf.set(OAUTH_CLIENT_ID_KEY, clientId);
conf.set(OAUTH_REFRESH_URL_KEY, refreshURL);
return conf;
}
@Test
public void refreshUrlIsCorrect() throws IOException {
final int PORT = 7552;
final String REFRESH_ADDRESS = "http://localhost:" + PORT + "/refresh";
long tokenExpires = 0;
Configuration conf = buildConf("myreallycoolcredential",
Long.toString(tokenExpires),
CLIENT_ID_FOR_TESTING,
REFRESH_ADDRESS);
Timer mockTimer = mock(Timer.class);
when(mockTimer.now()).thenReturn(tokenExpires + 1000l);
AccessTokenProvider credProvider =
new ConfCredentialBasedAccessTokenProvider(mockTimer);
credProvider.setConf(conf);
// Build mock server to receive refresh request
ClientAndServer mockServer = startClientAndServer(PORT);
HttpRequest expectedRequest = request()
.withMethod("POST")
.withPath("/refresh")
.withBody(
// Note, OkHttp does not sort the param values, so we need to do
// it ourselves via the ordering provided to ParameterBody...
ParameterBody.params(
Parameter.param(CLIENT_SECRET, "myreallycoolcredential"),
Parameter.param(GRANT_TYPE, CLIENT_CREDENTIALS),
Parameter.param(CLIENT_ID, CLIENT_ID_FOR_TESTING)
));
MockServerClient mockServerClient = new MockServerClient("localhost", PORT);
// https://tools.ietf.org/html/rfc6749#section-5.1
Map<String, Object> map = new TreeMap<>();
map.put(EXPIRES_IN, "0987654321");
map.put(TOKEN_TYPE, "bearer");
map.put(ACCESS_TOKEN, "new access token");
ObjectMapper mapper = new ObjectMapper();
HttpResponse resp = response()
.withStatusCode(HttpStatus.SC_OK)
.withHeaders(
CONTENT_TYPE_APPLICATION_JSON
)
.withBody(mapper.writeValueAsString(map));
mockServerClient
.when(expectedRequest, exactly(1))
.respond(resp);
assertEquals("new access token", credProvider.getAccessToken());
mockServerClient.verify(expectedRequest);
mockServerClient.clear(expectedRequest);
mockServer.stop();
}
}

View File

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hdfs.web.oauth2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Timer;
import org.apache.http.HttpStatus;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.Test;
import org.mockserver.client.server.MockServerClient;
import org.mockserver.integration.ClientAndServer;
import org.mockserver.model.Header;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;
import org.mockserver.model.Parameter;
import org.mockserver.model.ParameterBody;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
import static org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider.OAUTH_REFRESH_TOKEN_EXPIRES_KEY;
import static org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider.OAUTH_REFRESH_TOKEN_KEY;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.ACCESS_TOKEN;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.BEARER;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_ID;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.EXPIRES_IN;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.GRANT_TYPE;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.REFRESH_TOKEN;
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.TOKEN_TYPE;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockserver.integration.ClientAndServer.startClientAndServer;
import static org.mockserver.matchers.Times.exactly;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.response;
public class TestRefreshTokenTimeBasedTokenRefresher {
public final static Header CONTENT_TYPE_APPLICATION_JSON
= new Header("Content-Type", "application/json");
public Configuration buildConf(String refreshToken, String tokenExpires,
String clientId, String refreshURL) {
// Configurations are simple enough that it's not worth mocking them out.
Configuration conf = new Configuration();
conf.set(OAUTH_REFRESH_TOKEN_KEY, refreshToken);
conf.set(OAUTH_REFRESH_TOKEN_EXPIRES_KEY, tokenExpires);
conf.set(OAUTH_CLIENT_ID_KEY, clientId);
conf.set(OAUTH_REFRESH_URL_KEY, refreshURL);
return conf;
}
@Test
public void refreshUrlIsCorrect() throws IOException {
final int PORT = 7552;
final String REFRESH_ADDRESS = "http://localhost:" + PORT + "/refresh";
long tokenExpires = 0;
Configuration conf = buildConf("refresh token key",
Long.toString(tokenExpires),
"joebob",
REFRESH_ADDRESS);
Timer mockTimer = mock(Timer.class);
when(mockTimer.now()).thenReturn(tokenExpires + 1000l);
AccessTokenProvider tokenProvider =
new ConfRefreshTokenBasedAccessTokenProvider(mockTimer);
tokenProvider.setConf(conf);
// Build mock server to receive refresh request
ClientAndServer mockServer = startClientAndServer(PORT);
HttpRequest expectedRequest = request()
.withMethod("POST")
.withPath("/refresh")
// Note, OkHttp does not sort the param values, so we need to
// do it ourselves via the ordering provided to ParameterBody...
.withBody(
ParameterBody.params(
Parameter.param(CLIENT_ID, "joebob"),
Parameter.param(GRANT_TYPE, REFRESH_TOKEN),
Parameter.param(REFRESH_TOKEN, "refresh token key")));
MockServerClient mockServerClient = new MockServerClient("localhost", PORT);
// https://tools.ietf.org/html/rfc6749#section-5.1
Map<String, Object> map = new TreeMap<>();
map.put(EXPIRES_IN, "0987654321");
map.put(TOKEN_TYPE, BEARER);
map.put(ACCESS_TOKEN, "new access token");
ObjectMapper mapper = new ObjectMapper();
HttpResponse resp = response()
.withStatusCode(HttpStatus.SC_OK)
.withHeaders(
CONTENT_TYPE_APPLICATION_JSON
)
.withBody(mapper.writeValueAsString(map));
mockServerClient
.when(expectedRequest, exactly(1))
.respond(resp);
assertEquals("new access token", tokenProvider.getAccessToken());
mockServerClient.verify(expectedRequest);
mockServerClient.clear(expectedRequest);
mockServer.stop();
}
}