HDFS-5506. Merge change r1542011 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1542015 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2013-11-14 18:52:55 +00:00
parent 6ce1f14fed
commit 809261808e
6 changed files with 158 additions and 112 deletions

View File

@ -116,6 +116,9 @@ Release 2.3.0 - UNRELEASED
HDFS-4995. Make getContentSummary less expensive. (kihwal) HDFS-4995. Make getContentSummary less expensive. (kihwal)
HDFS-5506. Use URLConnectionFactory in DelegationTokenFetcher. (Haohui Mai
via jing9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)

View File

@ -26,8 +26,8 @@ import java.io.InputStreamReader;
import java.io.PrintStream; import java.io.PrintStream;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL; import java.net.URL;
import java.net.URLConnection;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
@ -47,11 +47,14 @@ import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet;
import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet; import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet; import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
import org.apache.hadoop.hdfs.web.HftpFileSystem; import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.hdfs.web.HsftpFileSystem;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.GenericOptionsParser;
@ -142,11 +145,11 @@ public class DelegationTokenFetcher {
// default to using the local file system // default to using the local file system
FileSystem local = FileSystem.getLocal(conf); FileSystem local = FileSystem.getLocal(conf);
final Path tokenFile = new Path(local.getWorkingDirectory(), remaining[0]); final Path tokenFile = new Path(local.getWorkingDirectory(), remaining[0]);
final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
// Login the current user // Login the current user
UserGroupInformation.getCurrentUser().doAs( UserGroupInformation.getCurrentUser().doAs(
new PrivilegedExceptionAction<Object>() { new PrivilegedExceptionAction<Object>() {
@SuppressWarnings("unchecked")
@Override @Override
public Object run() throws Exception { public Object run() throws Exception {
if (print) { if (print) {
@ -183,7 +186,8 @@ public class DelegationTokenFetcher {
} else { } else {
// otherwise we are fetching // otherwise we are fetching
if (webUrl != null) { if (webUrl != null) {
Credentials creds = getDTfromRemote(webUrl, renewer); Credentials creds = getDTfromRemote(connectionFactory, new URI(webUrl),
renewer);
creds.writeTokenStorageFile(tokenFile, conf); creds.writeTokenStorageFile(tokenFile, conf);
for (Token<?> token : creds.getAllTokens()) { for (Token<?> token : creds.getAllTokens()) {
System.out.println("Fetched token via " + webUrl + " for " System.out.println("Fetched token via " + webUrl + " for "
@ -205,27 +209,31 @@ public class DelegationTokenFetcher {
}); });
} }
static public Credentials getDTfromRemote(String nnAddr, static public Credentials getDTfromRemote(URLConnectionFactory factory,
String renewer) throws IOException { URI nnUri, String renewer) throws IOException {
StringBuilder buf = new StringBuilder(nnUri.toString())
.append(GetDelegationTokenServlet.PATH_SPEC);
if (renewer != null) {
buf.append("?").append(GetDelegationTokenServlet.RENEWER).append("=")
.append(renewer);
}
HttpURLConnection conn = null;
DataInputStream dis = null; DataInputStream dis = null;
InetSocketAddress serviceAddr = NetUtils.createSocketAddr(nnAddr); InetSocketAddress serviceAddr = NetUtils.createSocketAddr(nnUri
.getAuthority());
try { try {
StringBuffer url = new StringBuffer(); if(LOG.isDebugEnabled()) {
if (renewer != null) { LOG.debug("Retrieving token from: " + buf);
url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC)
.append("?").append(GetDelegationTokenServlet.RENEWER).append("=")
.append(renewer);
} else {
url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC);
} }
URL remoteURL = new URL(url.toString());
URLConnection connection = SecurityUtil.openSecureHttpConnection(remoteURL); conn = run(factory, new URL(buf.toString()));
InputStream in = connection.getInputStream(); InputStream in = conn.getInputStream();
Credentials ts = new Credentials(); Credentials ts = new Credentials();
dis = new DataInputStream(in); dis = new DataInputStream(in);
ts.readFields(dis); ts.readFields(dis);
for(Token<?> token: ts.getAllTokens()) { for (Token<?> token : ts.getAllTokens()) {
token.setKind(HftpFileSystem.TOKEN_KIND); token.setKind(HftpFileSystem.TOKEN_KIND);
SecurityUtil.setTokenService(token, serviceAddr); SecurityUtil.setTokenService(token, serviceAddr);
} }
@ -233,53 +241,70 @@ public class DelegationTokenFetcher {
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Unable to obtain remote token", e); throw new IOException("Unable to obtain remote token", e);
} finally { } finally {
if(dis != null) dis.close(); IOUtils.cleanup(LOG, dis);
if (conn != null) {
conn.disconnect();
}
} }
} }
/**
* Cancel a Delegation Token.
* @param nnAddr the NameNode's address
* @param tok the token to cancel
* @throws IOException
* @throws AuthenticationException
*/
static public void cancelDelegationToken(URLConnectionFactory factory,
URI nnAddr, Token<DelegationTokenIdentifier> tok) throws IOException,
AuthenticationException {
StringBuilder buf = new StringBuilder(nnAddr.toString())
.append(CancelDelegationTokenServlet.PATH_SPEC).append("?")
.append(CancelDelegationTokenServlet.TOKEN).append("=")
.append(tok.encodeToUrlString());
HttpURLConnection conn = run(factory, new URL(buf.toString()));
conn.disconnect();
}
/** /**
* Renew a Delegation Token. * Renew a Delegation Token.
* @param nnAddr the NameNode's address * @param nnAddr the NameNode's address
* @param tok the token to renew * @param tok the token to renew
* @return the Date that the token will expire next. * @return the Date that the token will expire next.
* @throws IOException * @throws IOException
* @throws AuthenticationException
*/ */
static public long renewDelegationToken(String nnAddr, static public long renewDelegationToken(URLConnectionFactory factory,
Token<DelegationTokenIdentifier> tok URI nnAddr, Token<DelegationTokenIdentifier> tok) throws IOException,
) throws IOException { AuthenticationException {
StringBuilder buf = new StringBuilder(); StringBuilder buf = new StringBuilder(nnAddr.toString())
buf.append(nnAddr); .append(RenewDelegationTokenServlet.PATH_SPEC).append("?")
buf.append(RenewDelegationTokenServlet.PATH_SPEC); .append(RenewDelegationTokenServlet.TOKEN).append("=")
buf.append("?"); .append(tok.encodeToUrlString());
buf.append(RenewDelegationTokenServlet.TOKEN);
buf.append("=");
buf.append(tok.encodeToUrlString());
BufferedReader in = null;
HttpURLConnection connection = null;
HttpURLConnection connection = null;
BufferedReader in = null;
try { try {
URL url = new URL(buf.toString()); connection = run(factory, new URL(buf.toString()));
connection = (HttpURLConnection) SecurityUtil.openSecureHttpConnection(url); in = new BufferedReader(new InputStreamReader(
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) { connection.getInputStream(), Charsets.UTF_8));
throw new IOException("Error renewing token: " +
connection.getResponseMessage());
}
in = new BufferedReader(
new InputStreamReader(connection.getInputStream(), Charsets.UTF_8));
long result = Long.parseLong(in.readLine()); long result = Long.parseLong(in.readLine());
in.close();
return result; return result;
} catch (IOException ie) { } catch (IOException ie) {
LOG.info("error in renew over HTTP", ie); LOG.info("error in renew over HTTP", ie);
IOException e = getExceptionFromResponse(connection); IOException e = getExceptionFromResponse(connection);
IOUtils.cleanup(LOG, in); if (e != null) {
if(e!=null) { LOG.info("rethrowing exception from HTTP request: "
LOG.info("rethrowing exception from HTTP request: " + + e.getLocalizedMessage());
e.getLocalizedMessage());
throw e; throw e;
} }
throw ie; throw ie;
} finally {
IOUtils.cleanup(LOG, in);
if (connection != null) {
connection.disconnect();
}
} }
} }
@ -331,43 +356,28 @@ public class DelegationTokenFetcher {
return e; return e;
} }
private static HttpURLConnection run(URLConnectionFactory factory, URL url)
throws IOException, AuthenticationException {
HttpURLConnection conn = null;
/**
* Cancel a Delegation Token.
* @param nnAddr the NameNode's address
* @param tok the token to cancel
* @throws IOException
*/
static public void cancelDelegationToken(String nnAddr,
Token<DelegationTokenIdentifier> tok
) throws IOException {
StringBuilder buf = new StringBuilder();
buf.append(nnAddr);
buf.append(CancelDelegationTokenServlet.PATH_SPEC);
buf.append("?");
buf.append(CancelDelegationTokenServlet.TOKEN);
buf.append("=");
buf.append(tok.encodeToUrlString());
BufferedReader in = null;
HttpURLConnection connection=null;
try { try {
URL url = new URL(buf.toString()); conn = (HttpURLConnection) factory.openConnection(url, true);
connection = (HttpURLConnection) SecurityUtil.openSecureHttpConnection(url); if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) { String msg = conn.getResponseMessage();
throw new IOException("Error cancelling token: " +
connection.getResponseMessage()); throw new IOException("Error when dealing remote token: " + msg);
} }
} catch (IOException ie) { } catch (IOException ie) {
LOG.info("error in cancel over HTTP", ie); LOG.info("Error when dealing remote token:", ie);
IOException e = getExceptionFromResponse(connection); IOException e = getExceptionFromResponse(conn);
IOUtils.cleanup(LOG, in); if (e != null) {
if(e!=null) { LOG.info("rethrowing exception from HTTP request: "
LOG.info("rethrowing exception from HTTP request: " + + e.getLocalizedMessage());
e.getLocalizedMessage());
throw e; throw e;
} }
throw ie; throw ie;
} }
return conn;
} }
} }

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
@ -231,7 +232,7 @@ public class HftpFileSystem extends FileSystem
final String nnHttpUrl = nnUri.toString(); final String nnHttpUrl = nnUri.toString();
Credentials c; Credentials c;
try { try {
c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer); c = DelegationTokenFetcher.getDTfromRemote(connectionFactory, nnUri, renewer);
} catch (IOException e) { } catch (IOException e) {
if (e.getCause() instanceof ConnectException) { if (e.getCause() instanceof ConnectException) {
LOG.warn("Couldn't connect to " + nnHttpUrl + LOG.warn("Couldn't connect to " + nnHttpUrl +
@ -666,10 +667,13 @@ public class HftpFileSystem extends FileSystem
// update the kerberos credentials, if they are coming from a keytab // update the kerberos credentials, if they are coming from a keytab
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token); InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
return try {
DelegationTokenFetcher.renewDelegationToken return DelegationTokenFetcher.renewDelegationToken(connectionFactory,
(DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(), DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr),
(Token<DelegationTokenIdentifier>) token); (Token<DelegationTokenIdentifier>) token);
} catch (AuthenticationException e) {
throw new IOException(e);
}
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -678,8 +682,12 @@ public class HftpFileSystem extends FileSystem
// update the kerberos credentials, if they are coming from a keytab // update the kerberos credentials, if they are coming from a keytab
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token); InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
DelegationTokenFetcher.cancelDelegationToken try {
(DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(), DelegationTokenFetcher.cancelDelegationToken(connectionFactory, DFSUtil
(Token<DelegationTokenIdentifier>) token); .createUri(getUnderlyingProtocol(), serviceAddr),
(Token<DelegationTokenIdentifier>) token);
} catch (AuthenticationException e) {
throw new IOException(e);
}
} }
} }

View File

@ -27,7 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL; import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.security.authentication.client.AuthenticationException;
@ -77,24 +77,28 @@ public class URLConnectionFactory {
* @throws IOException * @throws IOException
*/ */
public URLConnection openConnection(URL url) throws IOException { public URLConnection openConnection(URL url) throws IOException {
URLConnection connection = url.openConnection(); try {
if (connection instanceof HttpURLConnection) { return openConnection(url, false);
connConfigurator.configure((HttpURLConnection) connection); } catch (AuthenticationException e) {
// Unreachable
return null;
} }
return connection;
} }
/** /**
* Opens a url with read and connect timeouts * Opens a url with read and connect timeouts
* *
* @param url URL to open * @param url
* URL to open
* @param isSpnego
* whether the url should be authenticated via SPNEGO
* @return URLConnection * @return URLConnection
* @throws IOException * @throws IOException
* @throws AuthenticationException * @throws AuthenticationException
*/ */
public URLConnection openConnection(HttpOpParam.Op op, URL url) public URLConnection openConnection(URL url, boolean isSpnego)
throws IOException, AuthenticationException { throws IOException, AuthenticationException {
if (op.getRequireAuth()) { if (isSpnego) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("open AuthenticatedURL connection" + url); LOG.debug("open AuthenticatedURL connection" + url);
} }
@ -106,7 +110,11 @@ public class URLConnectionFactory {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("open URL connection"); LOG.debug("open URL connection");
} }
return openConnection(url); URLConnection connection = url.openConnection();
if (connection instanceof HttpURLConnection) {
connConfigurator.configure((HttpURLConnection) connection);
}
return connection;
} }
} }

View File

@ -492,7 +492,8 @@ public class WebHdfsFileSystem extends FileSystem
throws IOException { throws IOException {
final HttpURLConnection conn; final HttpURLConnection conn;
try { try {
conn = (HttpURLConnection) connectionFactory.openConnection(op, url); conn = (HttpURLConnection) connectionFactory.openConnection(url,
op.getRequireAuth());
} catch (AuthenticationException e) { } catch (AuthenticationException e) {
throw new IOException(e); throw new IOException(e);
} }

View File

@ -26,6 +26,8 @@ import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
@ -37,10 +39,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher; import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
import org.apache.hadoop.hdfs.web.HftpFileSystem; import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.bootstrap.ServerBootstrap;
@ -59,6 +63,7 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator; import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest; import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder; import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponse; import org.jboss.netty.handler.codec.http.HttpResponse;
@ -78,9 +83,10 @@ public class TestDelegationTokenRemoteFetcher {
private static final String EXP_DATE = "124123512361236"; private static final String EXP_DATE = "124123512361236";
private static final String tokenFile = "http.file.dta"; private static final String tokenFile = "http.file.dta";
private static final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
private int httpPort; private int httpPort;
private String serviceUrl; private URI serviceUrl;
private FileSystem fileSys; private FileSystem fileSys;
private Configuration conf; private Configuration conf;
private ServerBootstrap bootstrap; private ServerBootstrap bootstrap;
@ -92,7 +98,7 @@ public class TestDelegationTokenRemoteFetcher {
conf = new Configuration(); conf = new Configuration();
fileSys = FileSystem.getLocal(conf); fileSys = FileSystem.getLocal(conf);
httpPort = NetUtils.getFreeSocketPort(); httpPort = NetUtils.getFreeSocketPort();
serviceUrl = "http://localhost:" + httpPort; serviceUrl = new URI("http://localhost:" + httpPort);
testToken = createToken(serviceUrl); testToken = createToken(serviceUrl);
} }
@ -121,9 +127,9 @@ public class TestDelegationTokenRemoteFetcher {
* try to fetch token without http server with IOException * try to fetch token without http server with IOException
*/ */
@Test @Test
public void testTokenRenewFail() { public void testTokenRenewFail() throws AuthenticationException {
try { try {
DelegationTokenFetcher.renewDelegationToken(serviceUrl, testToken); DelegationTokenFetcher.renewDelegationToken(connectionFactory, serviceUrl, testToken);
fail("Token fetcher shouldn't be able to renew tokens in absense of NN"); fail("Token fetcher shouldn't be able to renew tokens in absense of NN");
} catch (IOException ex) { } catch (IOException ex) {
} }
@ -133,9 +139,9 @@ public class TestDelegationTokenRemoteFetcher {
* try cancel token without http server with IOException * try cancel token without http server with IOException
*/ */
@Test @Test
public void expectedTokenCancelFail() { public void expectedTokenCancelFail() throws AuthenticationException {
try { try {
DelegationTokenFetcher.cancelDelegationToken(serviceUrl, testToken); DelegationTokenFetcher.cancelDelegationToken(connectionFactory, serviceUrl, testToken);
fail("Token fetcher shouldn't be able to cancel tokens in absense of NN"); fail("Token fetcher shouldn't be able to cancel tokens in absense of NN");
} catch (IOException ex) { } catch (IOException ex) {
} }
@ -145,11 +151,12 @@ public class TestDelegationTokenRemoteFetcher {
* try fetch token and get http response with error * try fetch token and get http response with error
*/ */
@Test @Test
public void expectedTokenRenewErrorHttpResponse() { public void expectedTokenRenewErrorHttpResponse()
throws AuthenticationException, URISyntaxException {
bootstrap = startHttpServer(httpPort, testToken, serviceUrl); bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
try { try {
DelegationTokenFetcher.renewDelegationToken(serviceUrl + "/exception", DelegationTokenFetcher.renewDelegationToken(connectionFactory, new URI(
createToken(serviceUrl)); serviceUrl.toString() + "/exception"), createToken(serviceUrl));
fail("Token fetcher shouldn't be able to renew tokens using an invalid" fail("Token fetcher shouldn't be able to renew tokens using an invalid"
+ " NN URL"); + " NN URL");
} catch (IOException ex) { } catch (IOException ex) {
@ -159,13 +166,14 @@ public class TestDelegationTokenRemoteFetcher {
} }
/** /**
*
* *
*/ */
@Test @Test
public void testCancelTokenFromHttp() throws IOException { public void testCancelTokenFromHttp() throws IOException,
AuthenticationException {
bootstrap = startHttpServer(httpPort, testToken, serviceUrl); bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
DelegationTokenFetcher.cancelDelegationToken(serviceUrl, testToken); DelegationTokenFetcher.cancelDelegationToken(connectionFactory, serviceUrl,
testToken);
if (assertionError != null) if (assertionError != null)
throw assertionError; throw assertionError;
} }
@ -174,11 +182,12 @@ public class TestDelegationTokenRemoteFetcher {
* Call renew token using http server return new expiration time * Call renew token using http server return new expiration time
*/ */
@Test @Test
public void testRenewTokenFromHttp() throws IOException { public void testRenewTokenFromHttp() throws IOException,
NumberFormatException, AuthenticationException {
bootstrap = startHttpServer(httpPort, testToken, serviceUrl); bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
assertTrue("testRenewTokenFromHttp error", assertTrue("testRenewTokenFromHttp error",
Long.valueOf(EXP_DATE) == DelegationTokenFetcher.renewDelegationToken( Long.valueOf(EXP_DATE) == DelegationTokenFetcher.renewDelegationToken(
serviceUrl, testToken)); connectionFactory, serviceUrl, testToken));
if (assertionError != null) if (assertionError != null)
throw assertionError; throw assertionError;
} }
@ -204,11 +213,11 @@ public class TestDelegationTokenRemoteFetcher {
throw assertionError; throw assertionError;
} }
private static Token<DelegationTokenIdentifier> createToken(String serviceUri) { private static Token<DelegationTokenIdentifier> createToken(URI serviceUri) {
byte[] pw = "hadoop".getBytes(); byte[] pw = "hadoop".getBytes();
byte[] ident = new DelegationTokenIdentifier(new Text("owner"), new Text( byte[] ident = new DelegationTokenIdentifier(new Text("owner"), new Text(
"renewer"), new Text("realuser")).getBytes(); "renewer"), new Text("realuser")).getBytes();
Text service = new Text(serviceUri); Text service = new Text(serviceUri.toString());
return new Token<DelegationTokenIdentifier>(ident, pw, return new Token<DelegationTokenIdentifier>(ident, pw,
HftpFileSystem.TOKEN_KIND, service); HftpFileSystem.TOKEN_KIND, service);
} }
@ -301,8 +310,15 @@ public class TestDelegationTokenRemoteFetcher {
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
throws Exception { throws Exception {
HttpRequest request = (HttpRequest) e.getMessage(); HttpRequest request = (HttpRequest) e.getMessage();
if (request.getMethod() != GET) {
return; if (request.getMethod() == HttpMethod.OPTIONS) {
// Mimic SPNEGO authentication
HttpResponse response = new DefaultHttpResponse(HTTP_1_1,
HttpResponseStatus.OK);
response.addHeader("Set-Cookie", "hadoop-auth=1234");
e.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
} else if (request.getMethod() != GET) {
e.getChannel().close();
} }
UnmodifiableIterator<Map.Entry<String, Handler>> iter = routes.entrySet() UnmodifiableIterator<Map.Entry<String, Handler>> iter = routes.entrySet()
.iterator(); .iterator();
@ -338,7 +354,7 @@ public class TestDelegationTokenRemoteFetcher {
} }
private ServerBootstrap startHttpServer(int port, private ServerBootstrap startHttpServer(int port,
final Token<DelegationTokenIdentifier> token, final String url) { final Token<DelegationTokenIdentifier> token, final URI url) {
ServerBootstrap bootstrap = new ServerBootstrap( ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
@ -348,7 +364,7 @@ public class TestDelegationTokenRemoteFetcher {
public ChannelPipeline getPipeline() throws Exception { public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new HttpRequestDecoder(), return Channels.pipeline(new HttpRequestDecoder(),
new HttpChunkAggregator(65536), new HttpResponseEncoder(), new HttpChunkAggregator(65536), new HttpResponseEncoder(),
new CredentialsLogicHandler(token, url)); new CredentialsLogicHandler(token, url.toString()));
} }
}); });
bootstrap.bind(new InetSocketAddress("localhost", port)); bootstrap.bind(new InetSocketAddress("localhost", port));