HDFS-5506. Use URLConnectionFactory in DelegationTokenFetcher. Contributed by Haohui Mai.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1542011 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2013-11-14 18:40:36 +00:00
parent 88c4d4a714
commit 43fa41fdee
6 changed files with 157 additions and 116 deletions

View File

@ -491,6 +491,9 @@ Release 2.3.0 - UNRELEASED
HDFS-4995. Make getContentSummary less expensive. (kihwal)
HDFS-5506. Use URLConnectionFactory in DelegationTokenFetcher. (Haohui Mai
via jing9)
OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)

View File

@ -26,8 +26,8 @@
import java.io.PrintStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.Date;
@ -47,11 +47,14 @@
import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.hdfs.web.HsftpFileSystem;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.GenericOptionsParser;
@ -142,11 +145,11 @@ public static void main(final String[] args) throws Exception {
// default to using the local file system
FileSystem local = FileSystem.getLocal(conf);
final Path tokenFile = new Path(local.getWorkingDirectory(), remaining[0]);
final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
// Login the current user
UserGroupInformation.getCurrentUser().doAs(
new PrivilegedExceptionAction<Object>() {
@SuppressWarnings("unchecked")
@Override
public Object run() throws Exception {
@ -182,7 +185,8 @@ public Object run() throws Exception {
} else {
// otherwise we are fetching
if (webUrl != null) {
Credentials creds = getDTfromRemote(webUrl, renewer);
Credentials creds = getDTfromRemote(connectionFactory, new URI(webUrl),
renewer);
creds.writeTokenStorageFile(tokenFile, conf);
for (Token<?> token : creds.getAllTokens()) {
if(LOG.isDebugEnabled()) {
@ -208,28 +212,27 @@ public Object run() throws Exception {
});
}
static public Credentials getDTfromRemote(String nnAddr,
String renewer) throws IOException {
static public Credentials getDTfromRemote(URLConnectionFactory factory,
URI nnUri, String renewer) throws IOException {
StringBuilder buf = new StringBuilder(nnUri.toString())
.append(GetDelegationTokenServlet.PATH_SPEC);
if (renewer != null) {
buf.append("?").append(GetDelegationTokenServlet.RENEWER).append("=")
.append(renewer);
}
HttpURLConnection conn = null;
DataInputStream dis = null;
InetSocketAddress serviceAddr = NetUtils.createSocketAddr(nnAddr);
InetSocketAddress serviceAddr = NetUtils.createSocketAddr(nnUri
.getAuthority());
try {
StringBuffer url = new StringBuffer();
if (renewer != null) {
url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC)
.append("?").append(GetDelegationTokenServlet.RENEWER).append("=")
.append(renewer);
} else {
url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC);
}
if(LOG.isDebugEnabled()) {
LOG.debug("Retrieving token from: " + url);
LOG.debug("Retrieving token from: " + buf);
}
URL remoteURL = new URL(url.toString());
URLConnection connection = SecurityUtil.openSecureHttpConnection(remoteURL);
InputStream in = connection.getInputStream();
conn = run(factory, new URL(buf.toString()));
InputStream in = conn.getInputStream();
Credentials ts = new Credentials();
dis = new DataInputStream(in);
ts.readFields(dis);
@ -241,9 +244,30 @@ static public Credentials getDTfromRemote(String nnAddr,
} catch (Exception e) {
throw new IOException("Unable to obtain remote token", e);
} finally {
if(dis != null) dis.close();
IOUtils.cleanup(LOG, dis);
if (conn != null) {
conn.disconnect();
}
}
}
/**
* Cancel a Delegation Token.
* @param nnAddr the NameNode's address
* @param tok the token to cancel
* @throws IOException
* @throws AuthenticationException
*/
static public void cancelDelegationToken(URLConnectionFactory factory,
URI nnAddr, Token<DelegationTokenIdentifier> tok) throws IOException,
AuthenticationException {
StringBuilder buf = new StringBuilder(nnAddr.toString())
.append(CancelDelegationTokenServlet.PATH_SPEC).append("?")
.append(CancelDelegationTokenServlet.TOKEN).append("=")
.append(tok.encodeToUrlString());
HttpURLConnection conn = run(factory, new URL(buf.toString()));
conn.disconnect();
}
/**
* Renew a Delegation Token.
@ -251,43 +275,39 @@ static public Credentials getDTfromRemote(String nnAddr,
* @param tok the token to renew
* @return the Date that the token will expire next.
* @throws IOException
* @throws AuthenticationException
*/
static public long renewDelegationToken(String nnAddr,
Token<DelegationTokenIdentifier> tok
) throws IOException {
StringBuilder buf = new StringBuilder();
buf.append(nnAddr);
buf.append(RenewDelegationTokenServlet.PATH_SPEC);
buf.append("?");
buf.append(RenewDelegationTokenServlet.TOKEN);
buf.append("=");
buf.append(tok.encodeToUrlString());
BufferedReader in = null;
HttpURLConnection connection = null;
static public long renewDelegationToken(URLConnectionFactory factory,
URI nnAddr, Token<DelegationTokenIdentifier> tok) throws IOException,
AuthenticationException {
StringBuilder buf = new StringBuilder(nnAddr.toString())
.append(RenewDelegationTokenServlet.PATH_SPEC).append("?")
.append(RenewDelegationTokenServlet.TOKEN).append("=")
.append(tok.encodeToUrlString());
HttpURLConnection connection = null;
BufferedReader in = null;
try {
URL url = new URL(buf.toString());
connection = (HttpURLConnection) SecurityUtil.openSecureHttpConnection(url);
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new IOException("Error renewing token: " +
connection.getResponseMessage());
}
in = new BufferedReader(
new InputStreamReader(connection.getInputStream(), Charsets.UTF_8));
connection = run(factory, new URL(buf.toString()));
in = new BufferedReader(new InputStreamReader(
connection.getInputStream(), Charsets.UTF_8));
long result = Long.parseLong(in.readLine());
in.close();
return result;
} catch (IOException ie) {
LOG.info("error in renew over HTTP", ie);
IOException e = getExceptionFromResponse(connection);
IOUtils.cleanup(LOG, in);
if (e != null) {
LOG.info("rethrowing exception from HTTP request: " +
e.getLocalizedMessage());
LOG.info("rethrowing exception from HTTP request: "
+ e.getLocalizedMessage());
throw e;
}
throw ie;
} finally {
IOUtils.cleanup(LOG, in);
if (connection != null) {
connection.disconnect();
}
}
}
@ -339,43 +359,28 @@ static private IOException getExceptionFromResponse(HttpURLConnection con) {
return e;
}
private static HttpURLConnection run(URLConnectionFactory factory, URL url)
throws IOException, AuthenticationException {
HttpURLConnection conn = null;
/**
* Cancel a Delegation Token.
* @param nnAddr the NameNode's address
* @param tok the token to cancel
* @throws IOException
*/
static public void cancelDelegationToken(String nnAddr,
Token<DelegationTokenIdentifier> tok
) throws IOException {
StringBuilder buf = new StringBuilder();
buf.append(nnAddr);
buf.append(CancelDelegationTokenServlet.PATH_SPEC);
buf.append("?");
buf.append(CancelDelegationTokenServlet.TOKEN);
buf.append("=");
buf.append(tok.encodeToUrlString());
BufferedReader in = null;
HttpURLConnection connection=null;
try {
URL url = new URL(buf.toString());
connection = (HttpURLConnection) SecurityUtil.openSecureHttpConnection(url);
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new IOException("Error cancelling token: " +
connection.getResponseMessage());
conn = (HttpURLConnection) factory.openConnection(url, true);
if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
String msg = conn.getResponseMessage();
throw new IOException("Error when dealing remote token: " + msg);
}
} catch (IOException ie) {
LOG.info("error in cancel over HTTP", ie);
IOException e = getExceptionFromResponse(connection);
LOG.info("Error when dealing remote token:", ie);
IOException e = getExceptionFromResponse(conn);
IOUtils.cleanup(LOG, in);
if (e != null) {
LOG.info("rethrowing exception from HTTP request: " +
e.getLocalizedMessage());
LOG.info("rethrowing exception from HTTP request: "
+ e.getLocalizedMessage());
throw e;
}
throw ie;
}
return conn;
}
}

View File

@ -57,6 +57,7 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Progressable;
@ -231,7 +232,7 @@ public Token<?> run() throws IOException {
final String nnHttpUrl = nnUri.toString();
Credentials c;
try {
c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
c = DelegationTokenFetcher.getDTfromRemote(connectionFactory, nnUri, renewer);
} catch (IOException e) {
if (e.getCause() instanceof ConnectException) {
LOG.warn("Couldn't connect to " + nnHttpUrl +
@ -666,10 +667,13 @@ public long renewDelegationToken(Token<?> token) throws IOException {
// update the kerberos credentials, if they are coming from a keytab
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
return
DelegationTokenFetcher.renewDelegationToken
(DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
try {
return DelegationTokenFetcher.renewDelegationToken(connectionFactory,
DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr),
(Token<DelegationTokenIdentifier>) token);
} catch (AuthenticationException e) {
throw new IOException(e);
}
}
@SuppressWarnings("unchecked")
@ -678,8 +682,12 @@ public void cancelDelegationToken(Token<?> token) throws IOException {
// update the kerberos credentials, if they are coming from a keytab
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
DelegationTokenFetcher.cancelDelegationToken
(DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
try {
DelegationTokenFetcher.cancelDelegationToken(connectionFactory, DFSUtil
.createUri(getUnderlyingProtocol(), serviceAddr),
(Token<DelegationTokenIdentifier>) token);
} catch (AuthenticationException e) {
throw new IOException(e);
}
}
}

View File

@ -27,7 +27,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
@ -77,24 +77,28 @@ public URLConnectionFactory(int socketTimeout) {
* @throws IOException
*/
public URLConnection openConnection(URL url) throws IOException {
URLConnection connection = url.openConnection();
if (connection instanceof HttpURLConnection) {
connConfigurator.configure((HttpURLConnection) connection);
try {
return openConnection(url, false);
} catch (AuthenticationException e) {
// Unreachable
return null;
}
return connection;
}
/**
* Opens a url with read and connect timeouts
*
* @param url URL to open
* @param url
* URL to open
* @param isSpnego
* whether the url should be authenticated via SPNEGO
* @return URLConnection
* @throws IOException
* @throws AuthenticationException
*/
public URLConnection openConnection(HttpOpParam.Op op, URL url)
public URLConnection openConnection(URL url, boolean isSpnego)
throws IOException, AuthenticationException {
if (op.getRequireAuth()) {
if (isSpnego) {
if (LOG.isDebugEnabled()) {
LOG.debug("open AuthenticatedURL connection" + url);
}
@ -106,7 +110,11 @@ public URLConnection openConnection(HttpOpParam.Op op, URL url)
if (LOG.isDebugEnabled()) {
LOG.debug("open URL connection");
}
return openConnection(url);
URLConnection connection = url.openConnection();
if (connection instanceof HttpURLConnection) {
connConfigurator.configure((HttpURLConnection) connection);
}
return connection;
}
}

View File

@ -491,7 +491,8 @@ private HttpURLConnection openHttpUrlConnection(final URL url)
throws IOException {
final HttpURLConnection conn;
try {
conn = (HttpURLConnection) connectionFactory.openConnection(op, url);
conn = (HttpURLConnection) connectionFactory.openConnection(url,
op.getRequireAuth());
} catch (AuthenticationException e) {
throw new IOException(e);
}

View File

@ -26,6 +26,8 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Map;
@ -37,10 +39,12 @@
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Logger;
import org.jboss.netty.bootstrap.ServerBootstrap;
@ -59,6 +63,7 @@
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
@ -78,9 +83,10 @@ public class TestDelegationTokenRemoteFetcher {
private static final String EXP_DATE = "124123512361236";
private static final String tokenFile = "http.file.dta";
private static final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
private int httpPort;
private String serviceUrl;
private URI serviceUrl;
private FileSystem fileSys;
private Configuration conf;
private ServerBootstrap bootstrap;
@ -92,7 +98,7 @@ public void init() throws Exception {
conf = new Configuration();
fileSys = FileSystem.getLocal(conf);
httpPort = NetUtils.getFreeSocketPort();
serviceUrl = "http://localhost:" + httpPort;
serviceUrl = new URI("http://localhost:" + httpPort);
testToken = createToken(serviceUrl);
}
@ -121,9 +127,9 @@ public void testTokenFetchFail() throws Exception {
* try to fetch token without http server with IOException
*/
@Test
public void testTokenRenewFail() {
public void testTokenRenewFail() throws AuthenticationException {
try {
DelegationTokenFetcher.renewDelegationToken(serviceUrl, testToken);
DelegationTokenFetcher.renewDelegationToken(connectionFactory, serviceUrl, testToken);
fail("Token fetcher shouldn't be able to renew tokens in absense of NN");
} catch (IOException ex) {
}
@ -133,9 +139,9 @@ public void testTokenRenewFail() {
* try cancel token without http server with IOException
*/
@Test
public void expectedTokenCancelFail() {
public void expectedTokenCancelFail() throws AuthenticationException {
try {
DelegationTokenFetcher.cancelDelegationToken(serviceUrl, testToken);
DelegationTokenFetcher.cancelDelegationToken(connectionFactory, serviceUrl, testToken);
fail("Token fetcher shouldn't be able to cancel tokens in absense of NN");
} catch (IOException ex) {
}
@ -145,11 +151,12 @@ public void expectedTokenCancelFail() {
* try fetch token and get http response with error
*/
@Test
public void expectedTokenRenewErrorHttpResponse() {
public void expectedTokenRenewErrorHttpResponse()
throws AuthenticationException, URISyntaxException {
bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
try {
DelegationTokenFetcher.renewDelegationToken(serviceUrl + "/exception",
createToken(serviceUrl));
DelegationTokenFetcher.renewDelegationToken(connectionFactory, new URI(
serviceUrl.toString() + "/exception"), createToken(serviceUrl));
fail("Token fetcher shouldn't be able to renew tokens using an invalid"
+ " NN URL");
} catch (IOException ex) {
@ -159,13 +166,14 @@ public void expectedTokenRenewErrorHttpResponse() {
}
/**
*
*
*/
@Test
public void testCancelTokenFromHttp() throws IOException {
public void testCancelTokenFromHttp() throws IOException,
AuthenticationException {
bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
DelegationTokenFetcher.cancelDelegationToken(serviceUrl, testToken);
DelegationTokenFetcher.cancelDelegationToken(connectionFactory, serviceUrl,
testToken);
if (assertionError != null)
throw assertionError;
}
@ -174,11 +182,12 @@ public void testCancelTokenFromHttp() throws IOException {
* Call renew token using http server return new expiration time
*/
@Test
public void testRenewTokenFromHttp() throws IOException {
public void testRenewTokenFromHttp() throws IOException,
NumberFormatException, AuthenticationException {
bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
assertTrue("testRenewTokenFromHttp error",
Long.valueOf(EXP_DATE) == DelegationTokenFetcher.renewDelegationToken(
serviceUrl, testToken));
connectionFactory, serviceUrl, testToken));
if (assertionError != null)
throw assertionError;
}
@ -204,11 +213,11 @@ public void expectedTokenIsRetrievedFromHttp() throws Exception {
throw assertionError;
}
private static Token<DelegationTokenIdentifier> createToken(String serviceUri) {
private static Token<DelegationTokenIdentifier> createToken(URI serviceUri) {
byte[] pw = "hadoop".getBytes();
byte[] ident = new DelegationTokenIdentifier(new Text("owner"), new Text(
"renewer"), new Text("realuser")).getBytes();
Text service = new Text(serviceUri);
Text service = new Text(serviceUri.toString());
return new Token<DelegationTokenIdentifier>(ident, pw,
HftpFileSystem.TOKEN_KIND, service);
}
@ -301,8 +310,15 @@ public CredentialsLogicHandler(Token<DelegationTokenIdentifier> token,
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
throws Exception {
HttpRequest request = (HttpRequest) e.getMessage();
if (request.getMethod() != GET) {
return;
if (request.getMethod() == HttpMethod.OPTIONS) {
// Mimic SPNEGO authentication
HttpResponse response = new DefaultHttpResponse(HTTP_1_1,
HttpResponseStatus.OK);
response.addHeader("Set-Cookie", "hadoop-auth=1234");
e.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
} else if (request.getMethod() != GET) {
e.getChannel().close();
}
UnmodifiableIterator<Map.Entry<String, Handler>> iter = routes.entrySet()
.iterator();
@ -338,7 +354,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
}
private ServerBootstrap startHttpServer(int port,
final Token<DelegationTokenIdentifier> token, final String url) {
final Token<DelegationTokenIdentifier> token, final URI url) {
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
@ -348,7 +364,7 @@ private ServerBootstrap startHttpServer(int port,
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new HttpRequestDecoder(),
new HttpChunkAggregator(65536), new HttpResponseEncoder(),
new CredentialsLogicHandler(token, url));
new CredentialsLogicHandler(token, url.toString()));
}
});
bootstrap.bind(new InetSocketAddress("localhost", port));