HDFS-5570. Addendum commit for r1584100.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1584174 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Haohui Mai 2014-04-02 20:53:01 +00:00
parent fb1d7fb596
commit 5b3481a750
20 changed files with 93 additions and 3790 deletions

View File

@ -1,83 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
/**
* Cancel delegation tokens over http for use in hftp.
*/
@SuppressWarnings("serial")
public class CancelDelegationTokenServlet extends DfsServlet {
private static final Log LOG = LogFactory.getLog(CancelDelegationTokenServlet.class);
public static final String PATH_SPEC = "/cancelDelegationToken";
public static final String TOKEN = "token";
@Override
protected void doGet(final HttpServletRequest req, final HttpServletResponse resp)
throws ServletException, IOException {
final UserGroupInformation ugi;
final ServletContext context = getServletContext();
final Configuration conf = NameNodeHttpServer.getConfFromContext(context);
try {
ugi = getUGI(req, conf);
} catch(IOException ioe) {
LOG.info("Request for token received with no authentication from "
+ req.getRemoteAddr(), ioe);
resp.sendError(HttpServletResponse.SC_FORBIDDEN,
"Unable to identify or authenticate user");
return;
}
final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(
context);
String tokenString = req.getParameter(TOKEN);
if (tokenString == null) {
resp.sendError(HttpServletResponse.SC_MULTIPLE_CHOICES,
"Token to renew not specified");
}
final Token<DelegationTokenIdentifier> token =
new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(tokenString);
try {
ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
nn.getRpcServer().cancelDelegationToken(token);
return null;
}
});
} catch(Exception e) {
LOG.info("Exception while cancelling token. Re-throwing. ", e);
resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
e.getMessage());
}
}
}

View File

@ -1,84 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.io.PrintWriter;
import java.security.PrivilegedExceptionAction;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ServletUtil;
import org.znerd.xmlenc.XMLOutputter;
/** Servlets for file checksum */
@InterfaceAudience.Private
public class ContentSummaryServlet extends DfsServlet {
/** For java.io.Serializable */
private static final long serialVersionUID = 1L;
@Override
public void doGet(final HttpServletRequest request,
final HttpServletResponse response) throws ServletException, IOException {
final Configuration conf =
(Configuration) getServletContext().getAttribute(JspHelper.CURRENT_CONF);
final UserGroupInformation ugi = getUGI(request, conf);
try {
ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
final String path = ServletUtil.getDecodedPath(request, "/contentSummary");
final PrintWriter out = response.getWriter();
final XMLOutputter xml = new XMLOutputter(out, "UTF-8");
xml.declaration();
try {
//get content summary
final ClientProtocol nnproxy = createNameNodeProxy();
final ContentSummary cs = nnproxy.getContentSummary(path);
//write xml
xml.startTag(ContentSummary.class.getName());
if (cs != null) {
xml.attribute("length" , "" + cs.getLength());
xml.attribute("fileCount" , "" + cs.getFileCount());
xml.attribute("directoryCount", "" + cs.getDirectoryCount());
xml.attribute("quota" , "" + cs.getQuota());
xml.attribute("spaceConsumed" , "" + cs.getSpaceConsumed());
xml.attribute("spaceQuota" , "" + cs.getSpaceQuota());
}
xml.endTag();
} catch(IOException ioe) {
writeXml(ioe, path, xml);
}
xml.endDocument();
return null;
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
}
}

View File

@ -1,134 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URL;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ServletUtil;
import org.znerd.xmlenc.XMLOutputter;
/** Servlets for file checksum */
@InterfaceAudience.Private
public class FileChecksumServlets {
/** Redirect file checksum queries to an appropriate datanode. */
@InterfaceAudience.Private
public static class RedirectServlet extends DfsServlet {
/** For java.io.Serializable */
private static final long serialVersionUID = 1L;
/** Create a redirection URL */
private URL createRedirectURL(UserGroupInformation ugi, DatanodeID host,
HttpServletRequest request, NameNode nn)
throws IOException {
final String hostname = host instanceof DatanodeInfo
? host.getHostName() : host.getIpAddr();
final String scheme = request.getScheme();
int port = host.getInfoPort();
if ("https".equals(scheme)) {
final Integer portObject = (Integer) getServletContext().getAttribute(
DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY);
if (portObject != null) {
port = portObject;
}
}
final String encodedPath = ServletUtil.getRawPath(request, "/fileChecksum");
String dtParam = "";
if (UserGroupInformation.isSecurityEnabled()) {
String tokenString = ugi.getTokens().iterator().next().encodeToUrlString();
dtParam = JspHelper.getDelegationTokenUrlParam(tokenString);
}
String addr = nn.getNameNodeAddressHostPortString();
String addrParam = JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, addr);
return new URL(scheme, hostname, port,
"/getFileChecksum" + encodedPath + '?' +
"ugi=" + ServletUtil.encodeQueryValue(ugi.getShortUserName()) +
dtParam + addrParam);
}
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response
) throws ServletException, IOException {
final ServletContext context = getServletContext();
final Configuration conf = NameNodeHttpServer.getConfFromContext(context);
final UserGroupInformation ugi = getUGI(request, conf);
final NameNode namenode = NameNodeHttpServer.getNameNodeFromContext(
context);
final DatanodeID datanode = NamenodeJspHelper.getRandomDatanode(namenode);
try {
response.sendRedirect(
createRedirectURL(ugi, datanode, request, namenode).toString());
} catch (IOException e) {
response.sendError(400, e.getMessage());
}
}
}
/** Get FileChecksum */
@InterfaceAudience.Private
public static class GetServlet extends DfsServlet {
/** For java.io.Serializable */
private static final long serialVersionUID = 1L;
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response
) throws ServletException, IOException {
final PrintWriter out = response.getWriter();
final String path = ServletUtil.getDecodedPath(request, "/getFileChecksum");
final XMLOutputter xml = new XMLOutputter(out, "UTF-8");
xml.declaration();
final ServletContext context = getServletContext();
final DataNode datanode = (DataNode) context.getAttribute("datanode");
final Configuration conf =
new HdfsConfiguration(datanode.getConf());
try {
final DFSClient dfs = DatanodeJspHelper.getDFSClient(request,
datanode, conf, getUGI(request, conf));
final MD5MD5CRC32FileChecksum checksum = dfs.getFileChecksum(path);
MD5MD5CRC32FileChecksum.write(xml, checksum);
} catch(IOException ioe) {
writeXml(ioe, path, xml);
} catch (InterruptedException e) {
writeXml(e, path, xml);
}
xml.endDocument();
}
}
}

View File

@ -1,144 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ServletUtil;
/** Redirect queries about the hosted filesystem to an appropriate datanode.
* @see org.apache.hadoop.hdfs.web.HftpFileSystem
*/
@InterfaceAudience.Private
public class FileDataServlet extends DfsServlet {
/** For java.io.Serializable */
private static final long serialVersionUID = 1L;
/** Create a redirection URL */
private URL createRedirectURL(String path, String encodedPath, HdfsFileStatus status,
UserGroupInformation ugi, ClientProtocol nnproxy, HttpServletRequest request, String dt)
throws IOException {
String scheme = request.getScheme();
final LocatedBlocks blks = nnproxy.getBlockLocations(
status.getFullPath(new Path(path)).toUri().getPath(), 0, 1);
final Configuration conf = NameNodeHttpServer.getConfFromContext(
getServletContext());
final DatanodeID host = pickSrcDatanode(blks, status, conf);
final String hostname;
if (host instanceof DatanodeInfo) {
hostname = host.getHostName();
} else {
hostname = host.getIpAddr();
}
int port = "https".equals(scheme) ? host.getInfoSecurePort() : host
.getInfoPort();
String dtParam = "";
if (dt != null) {
dtParam = JspHelper.getDelegationTokenUrlParam(dt);
}
// Add namenode address to the url params
NameNode nn = NameNodeHttpServer.getNameNodeFromContext(
getServletContext());
String addr = nn.getNameNodeAddressHostPortString();
String addrParam = JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, addr);
return new URL(scheme, hostname, port,
"/streamFile" + encodedPath + '?' +
"ugi=" + ServletUtil.encodeQueryValue(ugi.getShortUserName()) +
dtParam + addrParam);
}
/** Select a datanode to service this request.
* Currently, this looks at no more than the first five blocks of a file,
* selecting a datanode randomly from the most represented.
* @param conf
*/
private DatanodeID pickSrcDatanode(LocatedBlocks blks, HdfsFileStatus i,
Configuration conf) throws IOException {
if (i.getLen() == 0 || blks.getLocatedBlocks().size() <= 0) {
// pick a random datanode
NameNode nn = NameNodeHttpServer.getNameNodeFromContext(
getServletContext());
return NamenodeJspHelper.getRandomDatanode(nn);
}
return JspHelper.bestNode(blks, conf);
}
/**
* Service a GET request as described below.
* Request:
* {@code
* GET http://<nn>:<port>/data[/<path>] HTTP/1.1
* }
*/
@Override
public void doGet(final HttpServletRequest request,
final HttpServletResponse response)
throws IOException {
final Configuration conf = NameNodeHttpServer.getConfFromContext(
getServletContext());
final UserGroupInformation ugi = getUGI(request, conf);
try {
ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException {
ClientProtocol nn = createNameNodeProxy();
final String path = ServletUtil.getDecodedPath(request, "/data");
final String encodedPath = ServletUtil.getRawPath(request, "/data");
String delegationToken = request
.getParameter(JspHelper.DELEGATION_PARAMETER_NAME);
HdfsFileStatus info = nn.getFileInfo(path);
if (info != null && !info.isDir()) {
response.sendRedirect(createRedirectURL(path, encodedPath,
info, ugi, nn, request, delegationToken).toString());
} else if (info == null) {
response.sendError(400, "File not found " + path);
} else {
response.sendError(400, path + ": is a directory");
}
return null;
}
});
} catch (IOException e) {
response.sendError(400, e.getMessage());
} catch (InterruptedException e) {
response.sendError(400, e.getMessage());
}
}
}

View File

@ -1,86 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.DataOutputStream;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
/**
* Serve delegation tokens over http for use in hftp.
*/
@SuppressWarnings("serial")
public class GetDelegationTokenServlet extends DfsServlet {
private static final Log LOG = LogFactory.getLog(GetDelegationTokenServlet.class);
public static final String PATH_SPEC = "/getDelegationToken";
public static final String RENEWER = "renewer";
@Override
protected void doGet(final HttpServletRequest req, final HttpServletResponse resp)
throws ServletException, IOException {
final UserGroupInformation ugi;
final ServletContext context = getServletContext();
final Configuration conf = NameNodeHttpServer.getConfFromContext(context);
try {
ugi = getUGI(req, conf);
} catch(IOException ioe) {
LOG.info("Request for token received with no authentication from "
+ req.getRemoteAddr(), ioe);
resp.sendError(HttpServletResponse.SC_FORBIDDEN,
"Unable to identify or authenticate user");
return;
}
LOG.info("Sending token: {" + ugi.getUserName() + "," + req.getRemoteAddr() +"}");
final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
String renewer = req.getParameter(RENEWER);
final String renewerFinal = (renewer == null) ?
req.getUserPrincipal().getName() : renewer;
DataOutputStream dos = null;
try {
dos = new DataOutputStream(resp.getOutputStream());
final DataOutputStream dosFinal = dos; // for doAs block
ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException {
final Credentials ts = DelegationTokenSecretManager.createCredentials(
nn, ugi, renewerFinal);
ts.write(dosFinal);
return null;
}
});
} catch(Exception e) {
LOG.info("Exception while sending token. Re-throwing ", e);
resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
} finally {
if(dos != null) dos.close();
}
}
}

View File

@ -1,218 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.util.ServletUtil;
import org.apache.hadoop.util.VersionInfo;
import org.znerd.xmlenc.*;
import java.io.IOException;
import java.io.PrintWriter;
import java.security.PrivilegedExceptionAction;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Stack;
import java.util.regex.Pattern;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* Obtain meta-information about a filesystem.
* @see org.apache.hadoop.hdfs.web.HftpFileSystem
*/
@InterfaceAudience.Private
public class ListPathsServlet extends DfsServlet {
/** For java.io.Serializable */
private static final long serialVersionUID = 1L;
public static final ThreadLocal<SimpleDateFormat> df =
new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return HftpFileSystem.getDateFormat();
}
};
/**
* Write a node to output.
* Node information includes path, modification, permission, owner and group.
* For files, it also includes size, replication and block-size.
*/
static void writeInfo(final Path fullpath, final HdfsFileStatus i,
final XMLOutputter doc) throws IOException {
final SimpleDateFormat ldf = df.get();
doc.startTag(i.isDir() ? "directory" : "file");
doc.attribute("path", fullpath.toUri().getPath());
doc.attribute("modified", ldf.format(new Date(i.getModificationTime())));
doc.attribute("accesstime", ldf.format(new Date(i.getAccessTime())));
if (!i.isDir()) {
doc.attribute("size", String.valueOf(i.getLen()));
doc.attribute("replication", String.valueOf(i.getReplication()));
doc.attribute("blocksize", String.valueOf(i.getBlockSize()));
}
doc.attribute("permission", (i.isDir()? "d": "-") + i.getPermission());
doc.attribute("owner", i.getOwner());
doc.attribute("group", i.getGroup());
doc.endTag();
}
/**
* Build a map from the query string, setting values and defaults.
*/
protected Map<String,String> buildRoot(HttpServletRequest request,
XMLOutputter doc) {
final String path = ServletUtil.getDecodedPath(request, "/listPaths");
final String exclude = request.getParameter("exclude") != null
? request.getParameter("exclude") : "";
final String filter = request.getParameter("filter") != null
? request.getParameter("filter") : ".*";
final boolean recur = request.getParameter("recursive") != null
&& "yes".equals(request.getParameter("recursive"));
Map<String, String> root = new HashMap<String, String>();
root.put("path", path);
root.put("recursive", recur ? "yes" : "no");
root.put("filter", filter);
root.put("exclude", exclude);
root.put("time", df.get().format(new Date()));
root.put("version", VersionInfo.getVersion());
return root;
}
/**
* Service a GET request as described below.
* Request:
* {@code
* GET http://<nn>:<port>/listPaths[/<path>][<?option>[&option]*] HTTP/1.1
* }
*
* Where <i>option</i> (default) in:
* recursive (&quot;no&quot;)
* filter (&quot;.*&quot;)
* exclude (&quot;\..*\.crc&quot;)
*
* Response: A flat list of files/directories in the following format:
* {@code
* <listing path="..." recursive="(yes|no)" filter="..."
* time="yyyy-MM-dd hh:mm:ss UTC" version="...">
* <directory path="..." modified="yyyy-MM-dd hh:mm:ss"/>
* <file path="..." modified="yyyy-MM-dd'T'hh:mm:ssZ" accesstime="yyyy-MM-dd'T'hh:mm:ssZ"
* blocksize="..."
* replication="..." size="..."/>
* </listing>
* }
*/
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
final PrintWriter out = response.getWriter();
final XMLOutputter doc = new XMLOutputter(out, "UTF-8");
final Map<String, String> root = buildRoot(request, doc);
final String path = root.get("path");
final String filePath = ServletUtil.getDecodedPath(request, "/listPaths");
try {
final boolean recur = "yes".equals(root.get("recursive"));
final Pattern filter = Pattern.compile(root.get("filter"));
final Pattern exclude = Pattern.compile(root.get("exclude"));
final Configuration conf =
(Configuration) getServletContext().getAttribute(JspHelper.CURRENT_CONF);
getUGI(request, conf).doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException {
ClientProtocol nn = createNameNodeProxy();
doc.declaration();
doc.startTag("listing");
for (Map.Entry<String, String> m : root.entrySet()) {
doc.attribute(m.getKey(), m.getValue());
}
HdfsFileStatus base = nn.getFileInfo(filePath);
if ((base != null) && base.isDir()) {
writeInfo(base.getFullPath(new Path(path)), base, doc);
}
Stack<String> pathstack = new Stack<String>();
pathstack.push(path);
while (!pathstack.empty()) {
String p = pathstack.pop();
try {
byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;
DirectoryListing thisListing;
do {
assert lastReturnedName != null;
thisListing = nn.getListing(p, lastReturnedName, false);
if (thisListing == null) {
if (lastReturnedName.length == 0) {
LOG
.warn("ListPathsServlet - Path " + p
+ " does not exist");
}
break;
}
HdfsFileStatus[] listing = thisListing.getPartialListing();
for (HdfsFileStatus i : listing) {
final Path fullpath = i.getFullPath(new Path(p));
final String localName = fullpath.getName();
if (exclude.matcher(localName).matches()
|| !filter.matcher(localName).matches()) {
continue;
}
if (recur && i.isDir()) {
pathstack.push(new Path(p, localName).toUri().getPath());
}
writeInfo(fullpath, i, doc);
}
lastReturnedName = thisListing.getLastName();
} while (thisListing.hasMore());
} catch (IOException re) {
writeXml(re, p, doc);
}
}
return null;
}
});
} catch(IOException ioe) {
writeXml(ioe, path, doc);
} catch (InterruptedException e) {
LOG.warn("ListPathServlet encountered InterruptedException", e);
response.sendError(400, e.getMessage());
} finally {
if (doc != null) {
doc.endDocument();
}
if (out != null) {
out.close();
}
}
}
}

View File

@ -1,92 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.security.PrivilegedExceptionAction;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import com.google.common.base.Charsets;
/**
* Renew delegation tokens over http for use in hftp.
*/
@SuppressWarnings("serial")
public class RenewDelegationTokenServlet extends DfsServlet {
private static final Log LOG = LogFactory.getLog(RenewDelegationTokenServlet.class);
public static final String PATH_SPEC = "/renewDelegationToken";
public static final String TOKEN = "token";
@Override
protected void doGet(final HttpServletRequest req, final HttpServletResponse resp)
throws ServletException, IOException {
final UserGroupInformation ugi;
final ServletContext context = getServletContext();
final Configuration conf = NameNodeHttpServer.getConfFromContext(context);
try {
ugi = getUGI(req, conf);
} catch(IOException ioe) {
LOG.info("Request for token received with no authentication from "
+ req.getRemoteAddr(), ioe);
resp.sendError(HttpServletResponse.SC_FORBIDDEN,
"Unable to identify or authenticate user");
return;
}
final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
String tokenString = req.getParameter(TOKEN);
if (tokenString == null) {
resp.sendError(HttpServletResponse.SC_MULTIPLE_CHOICES,
"Token to renew not specified");
}
final Token<DelegationTokenIdentifier> token =
new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(tokenString);
try {
long result = ugi.doAs(new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws Exception {
return nn.getRpcServer().renewDelegationToken(token);
}
});
final PrintWriter os = new PrintWriter(new OutputStreamWriter(
resp.getOutputStream(), Charsets.UTF_8));
os.println(result);
os.close();
} catch(Exception e) {
// transfer exception over the http
String exceptionClass = e.getClass().getName();
String exceptionMsg = e.getLocalizedMessage();
String strException = exceptionClass + ";" + exceptionMsg;
LOG.info("Exception while renewing token. Re-throwing. s=" + strException, e);
resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, strException);
}
}
}

View File

@ -1,168 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.Enumeration;
import java.util.List;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ServletUtil;
import org.mortbay.jetty.InclusiveByteRange;
@InterfaceAudience.Private
public class StreamFile extends DfsServlet {
/** for java.io.Serializable */
private static final long serialVersionUID = 1L;
public static final String CONTENT_LENGTH = "Content-Length";
/* Return a DFS client to use to make the given HTTP request */
protected DFSClient getDFSClient(HttpServletRequest request)
throws IOException, InterruptedException {
final Configuration conf =
(Configuration) getServletContext().getAttribute(JspHelper.CURRENT_CONF);
UserGroupInformation ugi = getUGI(request, conf);
final ServletContext context = getServletContext();
final DataNode datanode = (DataNode) context.getAttribute("datanode");
return DatanodeJspHelper.getDFSClient(request, datanode, conf, ugi);
}
@Override
@SuppressWarnings("unchecked")
public void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
final String path = ServletUtil.getDecodedPath(request, "/streamFile");
final String rawPath = ServletUtil.getRawPath(request, "/streamFile");
final String filename = JspHelper.validatePath(path);
final String rawFilename = JspHelper.validatePath(rawPath);
if (filename == null) {
response.setContentType("text/plain");
PrintWriter out = response.getWriter();
out.print("Invalid input");
return;
}
Enumeration<String> reqRanges = request.getHeaders("Range");
if (reqRanges != null && !reqRanges.hasMoreElements()) {
reqRanges = null;
}
DFSClient dfs;
try {
dfs = getDFSClient(request);
} catch (InterruptedException e) {
response.sendError(400, e.getMessage());
return;
}
DFSInputStream in = null;
OutputStream out = null;
try {
in = dfs.open(filename);
out = response.getOutputStream();
final long fileLen = in.getFileLength();
if (reqRanges != null) {
List<InclusiveByteRange> ranges =
InclusiveByteRange.satisfiableRanges(reqRanges, fileLen);
StreamFile.sendPartialData(in, out, response, fileLen, ranges);
} else {
// No ranges, so send entire file
response.setHeader("Content-Disposition", "attachment; filename=\"" +
rawFilename + "\"");
response.setContentType("application/octet-stream");
response.setHeader(CONTENT_LENGTH, "" + fileLen);
StreamFile.copyFromOffset(in, out, 0L, fileLen);
}
in.close();
in = null;
out.close();
out = null;
dfs.close();
dfs = null;
} catch (IOException ioe) {
if (LOG.isDebugEnabled()) {
LOG.debug("response.isCommitted()=" + response.isCommitted(), ioe);
}
throw ioe;
} finally {
IOUtils.cleanup(LOG, in);
IOUtils.cleanup(LOG, out);
IOUtils.cleanup(LOG, dfs);
}
}
/**
* Send a partial content response with the given range. If there are
* no satisfiable ranges, or if multiple ranges are requested, which
* is unsupported, respond with range not satisfiable.
*
* @param in stream to read from
* @param out stream to write to
* @param response http response to use
* @param contentLength for the response header
* @param ranges to write to respond with
* @throws IOException on error sending the response
*/
static void sendPartialData(FSInputStream in,
OutputStream out,
HttpServletResponse response,
long contentLength,
List<InclusiveByteRange> ranges)
throws IOException {
if (ranges == null || ranges.size() != 1) {
response.setContentLength(0);
response.setStatus(HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE);
response.setHeader("Content-Range",
InclusiveByteRange.to416HeaderRangeString(contentLength));
} else {
InclusiveByteRange singleSatisfiableRange = ranges.get(0);
long singleLength = singleSatisfiableRange.getSize(contentLength);
response.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT);
response.setHeader("Content-Range",
singleSatisfiableRange.toHeaderRangeString(contentLength));
copyFromOffset(in, out,
singleSatisfiableRange.getFirst(contentLength),
singleLength);
}
}
/* Copy count bytes at the given offset from one stream to another */
static void copyFromOffset(FSInputStream in, OutputStream out, long offset,
long count) throws IOException {
in.seek(offset);
IOUtils.copyBytes(in, out, count, false);
}
}

View File

@ -1,729 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.TimeZone;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.DelegationTokenRenewer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ServletUtil;
import org.xml.sax.Attributes;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
import org.xml.sax.XMLReader;
import org.xml.sax.helpers.DefaultHandler;
import org.xml.sax.helpers.XMLReaderFactory;
/**
* An implementation of a protocol for accessing filesystems over HTTP.
* The following implementation provides a limited, read-only interface
* to a filesystem over HTTP.
* @see org.apache.hadoop.hdfs.server.namenode.ListPathsServlet
* @see org.apache.hadoop.hdfs.server.namenode.FileDataServlet
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class HftpFileSystem extends FileSystem
implements DelegationTokenRenewer.Renewable, TokenAspect.TokenManagementDelegator {
public static final String SCHEME = "hftp";
static {
HttpURLConnection.setFollowRedirects(true);
}
URLConnectionFactory connectionFactory;
public static final Text TOKEN_KIND = new Text("HFTP delegation");
protected UserGroupInformation ugi;
private URI hftpURI;
protected URI nnUri;
public static final String HFTP_TIMEZONE = "UTC";
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
protected TokenAspect<? extends HftpFileSystem> tokenAspect;
private Token<?> delegationToken;
private Token<?> renewToken;
protected Text tokenServiceName;
@Override
public URI getCanonicalUri() {
return super.getCanonicalUri();
}
public static final SimpleDateFormat getDateFormat() {
final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
df.setTimeZone(TimeZone.getTimeZone(HFTP_TIMEZONE));
return df;
}
protected static final ThreadLocal<SimpleDateFormat> df =
new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return getDateFormat();
}
};
@Override
protected int getDefaultPort() {
return DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
}
/**
* We generate the address with one of the following ports, in
* order of preference.
* 1. Port from the hftp URI e.g. hftp://namenode:4000/ will return 4000.
* 2. Port configured via DFS_NAMENODE_HTTP_PORT_KEY
* 3. DFS_NAMENODE_HTTP_PORT_DEFAULT i.e. 50070.
*
* @param uri
*/
protected InetSocketAddress getNamenodeAddr(URI uri) {
// use authority so user supplied uri can override port
return NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
}
protected URI getNamenodeUri(URI uri) {
return DFSUtil.createUri(getUnderlyingProtocol(), getNamenodeAddr(uri));
}
/**
* See the documentation of {@Link #getNamenodeAddr(URI)} for the logic
* behind selecting the canonical service name.
* @return
*/
@Override
public String getCanonicalServiceName() {
return SecurityUtil.buildTokenService(nnUri).toString();
}
@Override
protected URI canonicalizeUri(URI uri) {
return NetUtils.getCanonicalUri(uri, getDefaultPort());
}
/**
* Return the protocol scheme for the FileSystem.
* <p/>
*
* @return <code>hftp</code>
*/
@Override
public String getScheme() {
return SCHEME;
}
/**
* Initialize connectionFactory and tokenAspect. This function is intended to
* be overridden by HsFtpFileSystem.
*/
protected void initTokenAspect() {
tokenAspect = new TokenAspect<HftpFileSystem>(this, tokenServiceName, TOKEN_KIND);
}
@Override
public void initialize(final URI name, final Configuration conf)
throws IOException {
super.initialize(name, conf);
setConf(conf);
this.connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf);
this.ugi = UserGroupInformation.getCurrentUser();
this.nnUri = getNamenodeUri(name);
this.tokenServiceName = SecurityUtil.buildTokenService(nnUri);
try {
this.hftpURI = new URI(name.getScheme(), name.getAuthority(),
null, null, null);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
initTokenAspect();
if (UserGroupInformation.isSecurityEnabled()) {
tokenAspect.initDelegationToken(ugi);
}
}
@Override
public Token<?> getRenewToken() {
return renewToken;
}
/**
* Return the underlying protocol that is used to talk to the namenode.
*/
protected String getUnderlyingProtocol() {
return "http";
}
@Override
public synchronized <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
/**
* XXX The kind of the token has been changed by DelegationTokenFetcher. We
* use the token for renewal, since the reflection utilities needs the value
* of the kind field to correctly renew the token.
*
* For other operations, however, the client has to send a
* HDFS_DELEGATION_KIND token over the wire so that it can talk to Hadoop
* 0.20.203 clusters. Later releases fix this problem. See HDFS-5440 for
* more details.
*/
renewToken = token;
delegationToken = new Token<T>(token);
delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
}
@Override
public synchronized Token<?> getDelegationToken(final String renewer)
throws IOException {
try {
// Renew TGT if needed
UserGroupInformation connectUgi = ugi.getRealUser();
final String proxyUser = connectUgi == null ? null : ugi
.getShortUserName();
if (connectUgi == null) {
connectUgi = ugi;
}
return connectUgi.doAs(new PrivilegedExceptionAction<Token<?>>() {
@Override
public Token<?> run() throws IOException {
Credentials c;
try {
c = DelegationTokenFetcher.getDTfromRemote(connectionFactory,
nnUri, renewer, proxyUser);
} catch (IOException e) {
if (e.getCause() instanceof ConnectException) {
LOG.warn("Couldn't connect to " + nnUri +
", assuming security is disabled");
return null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Exception getting delegation token", e);
}
throw e;
}
for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
if(LOG.isDebugEnabled()) {
LOG.debug("Got dt for " + getUri() + ";t.service="
+t.getService());
}
return t;
}
return null;
}
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public URI getUri() {
return hftpURI;
}
/**
* Return a URL pointing to given path on the namenode.
*
* @param path to obtain the URL for
* @param query string to append to the path
* @return namenode URL referring to the given path
* @throws IOException on error constructing the URL
*/
protected URL getNamenodeURL(String path, String query) throws IOException {
final URL url = new URL(getUnderlyingProtocol(), nnUri.getHost(),
nnUri.getPort(), path + '?' + query);
if (LOG.isTraceEnabled()) {
LOG.trace("url=" + url);
}
return url;
}
/**
* Get encoded UGI parameter string for a URL.
*
* @return user_shortname,group1,group2...
*/
private String getEncodedUgiParameter() {
StringBuilder ugiParameter = new StringBuilder(
ServletUtil.encodeQueryValue(ugi.getShortUserName()));
for(String g: ugi.getGroupNames()) {
ugiParameter.append(",");
ugiParameter.append(ServletUtil.encodeQueryValue(g));
}
return ugiParameter.toString();
}
/**
* Open an HTTP connection to the namenode to read file data and metadata.
* @param path The path component of the URL
* @param query The query component of the URL
*/
protected HttpURLConnection openConnection(String path, String query)
throws IOException {
query = addDelegationTokenParam(query);
final URL url = getNamenodeURL(path, query);
final HttpURLConnection connection;
connection = (HttpURLConnection)connectionFactory.openConnection(url);
connection.setRequestMethod("GET");
connection.connect();
return connection;
}
protected String addDelegationTokenParam(String query) throws IOException {
String tokenString = null;
if (UserGroupInformation.isSecurityEnabled()) {
synchronized (this) {
tokenAspect.ensureTokenInitialized();
if (delegationToken != null) {
tokenString = delegationToken.encodeToUrlString();
return (query + JspHelper.getDelegationTokenUrlParam(tokenString));
}
}
}
return query;
}
static class RangeHeaderUrlOpener extends ByteRangeInputStream.URLOpener {
private final URLConnectionFactory connFactory;
RangeHeaderUrlOpener(URLConnectionFactory connFactory, final URL url) {
super(url);
this.connFactory = connFactory;
}
protected HttpURLConnection openConnection() throws IOException {
return (HttpURLConnection)connFactory.openConnection(url);
}
/** Use HTTP Range header for specifying offset. */
@Override
protected HttpURLConnection connect(final long offset,
final boolean resolved) throws IOException {
final HttpURLConnection conn = openConnection();
conn.setRequestMethod("GET");
if (offset != 0L) {
conn.setRequestProperty("Range", "bytes=" + offset + "-");
}
conn.connect();
//Expects HTTP_OK or HTTP_PARTIAL response codes.
final int code = conn.getResponseCode();
if (offset != 0L && code != HttpURLConnection.HTTP_PARTIAL) {
throw new IOException("HTTP_PARTIAL expected, received " + code);
} else if (offset == 0L && code != HttpURLConnection.HTTP_OK) {
throw new IOException("HTTP_OK expected, received " + code);
}
return conn;
}
}
static class RangeHeaderInputStream extends ByteRangeInputStream {
RangeHeaderInputStream(RangeHeaderUrlOpener o, RangeHeaderUrlOpener r) {
super(o, r);
}
RangeHeaderInputStream(URLConnectionFactory connFactory, final URL url) {
this(new RangeHeaderUrlOpener(connFactory, url),
new RangeHeaderUrlOpener(connFactory, null));
}
@Override
protected URL getResolvedUrl(final HttpURLConnection connection) {
return connection.getURL();
}
}
@Override
public FSDataInputStream open(Path f, int buffersize) throws IOException {
f = f.makeQualified(getUri(), getWorkingDirectory());
String path = "/data" + ServletUtil.encodePath(f.toUri().getPath());
String query = addDelegationTokenParam("ugi=" + getEncodedUgiParameter());
URL u = getNamenodeURL(path, query);
return new FSDataInputStream(new RangeHeaderInputStream(connectionFactory, u));
}
@Override
public void close() throws IOException {
super.close();
tokenAspect.removeRenewAction();
}
/** Class to parse and store a listing reply from the server. */
class LsParser extends DefaultHandler {
final ArrayList<FileStatus> fslist = new ArrayList<FileStatus>();
@Override
public void startElement(String ns, String localname, String qname,
Attributes attrs) throws SAXException {
if ("listing".equals(qname)) return;
if (!"file".equals(qname) && !"directory".equals(qname)) {
if (RemoteException.class.getSimpleName().equals(qname)) {
throw new SAXException(RemoteException.valueOf(attrs));
}
throw new SAXException("Unrecognized entry: " + qname);
}
long modif;
long atime = 0;
try {
final SimpleDateFormat ldf = df.get();
modif = ldf.parse(attrs.getValue("modified")).getTime();
String astr = attrs.getValue("accesstime");
if (astr != null) {
atime = ldf.parse(astr).getTime();
}
} catch (ParseException e) { throw new SAXException(e); }
FileStatus fs = "file".equals(qname)
? new FileStatus(
Long.parseLong(attrs.getValue("size")), false,
Short.valueOf(attrs.getValue("replication")).shortValue(),
Long.parseLong(attrs.getValue("blocksize")),
modif, atime, FsPermission.valueOf(attrs.getValue("permission")),
attrs.getValue("owner"), attrs.getValue("group"),
HftpFileSystem.this.makeQualified(
new Path(getUri().toString(), attrs.getValue("path"))))
: new FileStatus(0L, true, 0, 0L,
modif, atime, FsPermission.valueOf(attrs.getValue("permission")),
attrs.getValue("owner"), attrs.getValue("group"),
HftpFileSystem.this.makeQualified(
new Path(getUri().toString(), attrs.getValue("path"))));
fslist.add(fs);
}
private void fetchList(String path, boolean recur) throws IOException {
try {
XMLReader xr = XMLReaderFactory.createXMLReader();
xr.setContentHandler(this);
HttpURLConnection connection = openConnection(
"/listPaths" + ServletUtil.encodePath(path),
"ugi=" + getEncodedUgiParameter() + (recur ? "&recursive=yes" : ""));
InputStream resp = connection.getInputStream();
xr.parse(new InputSource(resp));
} catch(SAXException e) {
final Exception embedded = e.getException();
if (embedded != null && embedded instanceof IOException) {
throw (IOException)embedded;
}
throw new IOException("invalid xml directory content", e);
}
}
public FileStatus getFileStatus(Path f) throws IOException {
fetchList(f.toUri().getPath(), false);
if (fslist.size() == 0) {
throw new FileNotFoundException("File does not exist: " + f);
}
return fslist.get(0);
}
public FileStatus[] listStatus(Path f, boolean recur) throws IOException {
fetchList(f.toUri().getPath(), recur);
if (fslist.size() > 0 && (fslist.size() != 1 || fslist.get(0).isDirectory())) {
fslist.remove(0);
}
return fslist.toArray(new FileStatus[0]);
}
public FileStatus[] listStatus(Path f) throws IOException {
return listStatus(f, false);
}
}
@Override
public FileStatus[] listStatus(Path f) throws IOException {
LsParser lsparser = new LsParser();
return lsparser.listStatus(f);
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
LsParser lsparser = new LsParser();
return lsparser.getFileStatus(f);
}
private class ChecksumParser extends DefaultHandler {
private FileChecksum filechecksum;
@Override
public void startElement(String ns, String localname, String qname,
Attributes attrs) throws SAXException {
if (!MD5MD5CRC32FileChecksum.class.getName().equals(qname)) {
if (RemoteException.class.getSimpleName().equals(qname)) {
throw new SAXException(RemoteException.valueOf(attrs));
}
throw new SAXException("Unrecognized entry: " + qname);
}
filechecksum = MD5MD5CRC32FileChecksum.valueOf(attrs);
}
private FileChecksum getFileChecksum(String f) throws IOException {
final HttpURLConnection connection = openConnection(
"/fileChecksum" + ServletUtil.encodePath(f),
"ugi=" + getEncodedUgiParameter());
try {
final XMLReader xr = XMLReaderFactory.createXMLReader();
xr.setContentHandler(this);
xr.parse(new InputSource(connection.getInputStream()));
} catch(SAXException e) {
final Exception embedded = e.getException();
if (embedded != null && embedded instanceof IOException) {
throw (IOException)embedded;
}
throw new IOException("invalid xml directory content", e);
} finally {
connection.disconnect();
}
return filechecksum;
}
}
@Override
public FileChecksum getFileChecksum(Path f) throws IOException {
final String s = makeQualified(f).toUri().getPath();
return new ChecksumParser().getFileChecksum(s);
}
@Override
public Path getWorkingDirectory() {
return new Path("/").makeQualified(getUri(), null);
}
@Override
public void setWorkingDirectory(Path f) { }
/** This optional operation is not yet supported. */
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
throw new IOException("Not supported");
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication,
long blockSize, Progressable progress) throws IOException {
throw new IOException("Not supported");
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
throw new IOException("Not supported");
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
throw new IOException("Not supported");
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
throw new IOException("Not supported");
}
/**
* A parser for parsing {@link ContentSummary} xml.
*/
private class ContentSummaryParser extends DefaultHandler {
private ContentSummary contentsummary;
@Override
public void startElement(String ns, String localname, String qname,
Attributes attrs) throws SAXException {
if (!ContentSummary.class.getName().equals(qname)) {
if (RemoteException.class.getSimpleName().equals(qname)) {
throw new SAXException(RemoteException.valueOf(attrs));
}
throw new SAXException("Unrecognized entry: " + qname);
}
contentsummary = toContentSummary(attrs);
}
/**
* Connect to the name node and get content summary.
* @param path The path
* @return The content summary for the path.
* @throws IOException
*/
private ContentSummary getContentSummary(String path) throws IOException {
final HttpURLConnection connection = openConnection(
"/contentSummary" + ServletUtil.encodePath(path),
"ugi=" + getEncodedUgiParameter());
InputStream in = null;
try {
in = connection.getInputStream();
final XMLReader xr = XMLReaderFactory.createXMLReader();
xr.setContentHandler(this);
xr.parse(new InputSource(in));
} catch(FileNotFoundException fnfe) {
//the server may not support getContentSummary
return null;
} catch(SAXException saxe) {
final Exception embedded = saxe.getException();
if (embedded != null && embedded instanceof IOException) {
throw (IOException)embedded;
}
throw new IOException("Invalid xml format", saxe);
} finally {
if (in != null) {
in.close();
}
connection.disconnect();
}
return contentsummary;
}
}
/** Return the object represented in the attributes. */
private static ContentSummary toContentSummary(Attributes attrs
) throws SAXException {
final String length = attrs.getValue("length");
final String fileCount = attrs.getValue("fileCount");
final String directoryCount = attrs.getValue("directoryCount");
final String quota = attrs.getValue("quota");
final String spaceConsumed = attrs.getValue("spaceConsumed");
final String spaceQuota = attrs.getValue("spaceQuota");
if (length == null
|| fileCount == null
|| directoryCount == null
|| quota == null
|| spaceConsumed == null
|| spaceQuota == null) {
return null;
}
try {
return new ContentSummary(
Long.parseLong(length),
Long.parseLong(fileCount),
Long.parseLong(directoryCount),
Long.parseLong(quota),
Long.parseLong(spaceConsumed),
Long.parseLong(spaceQuota));
} catch(Exception e) {
throw new SAXException("Invalid attributes: length=" + length
+ ", fileCount=" + fileCount
+ ", directoryCount=" + directoryCount
+ ", quota=" + quota
+ ", spaceConsumed=" + spaceConsumed
+ ", spaceQuota=" + spaceQuota, e);
}
}
@Override
public ContentSummary getContentSummary(Path f) throws IOException {
final String s = makeQualified(f).toUri().getPath();
final ContentSummary cs = new ContentSummaryParser().getContentSummary(s);
return cs != null? cs: super.getContentSummary(f);
}
@SuppressWarnings("unchecked")
@Override
public long renewDelegationToken(final Token<?> token) throws IOException {
// update the kerberos credentials, if they are coming from a keytab
UserGroupInformation connectUgi = ugi.getRealUser();
if (connectUgi == null) {
connectUgi = ugi;
}
try {
return connectUgi.doAs(new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws Exception {
InetSocketAddress serviceAddr = SecurityUtil
.getTokenServiceAddr(token);
return DelegationTokenFetcher.renewDelegationToken(connectionFactory,
DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr),
(Token<DelegationTokenIdentifier>) token);
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@SuppressWarnings("unchecked")
@Override
public void cancelDelegationToken(final Token<?> token) throws IOException {
UserGroupInformation connectUgi = ugi.getRealUser();
if (connectUgi == null) {
connectUgi = ugi;
}
try {
connectUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
InetSocketAddress serviceAddr = SecurityUtil
.getTokenServiceAddr(token);
DelegationTokenFetcher.cancelDelegationToken(connectionFactory,
DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr),
(Token<DelegationTokenIdentifier>) token);
return null;
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
}
}

View File

@ -1,69 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.Text;
/**
* An implementation of a protocol for accessing filesystems over HTTPS. The
* following implementation provides a limited, read-only interface to a
* filesystem over HTTPS.
*
* @see org.apache.hadoop.hdfs.server.namenode.ListPathsServlet
* @see org.apache.hadoop.hdfs.server.namenode.FileDataServlet
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class HsftpFileSystem extends HftpFileSystem {
public static final Text TOKEN_KIND = new Text("HSFTP delegation");
public static final String SCHEME = "hsftp";
/**
* Return the protocol scheme for the FileSystem.
* <p/>
*
* @return <code>hsftp</code>
*/
@Override
public String getScheme() {
return SCHEME;
}
/**
* Return the underlying protocol that is used to talk to the namenode.
*/
@Override
protected String getUnderlyingProtocol() {
return "https";
}
@Override
protected void initTokenAspect() {
tokenAspect = new TokenAspect<HsftpFileSystem>(this, tokenServiceName,
TOKEN_KIND);
}
@Override
protected int getDefaultPort() {
return DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT;
}
}

View File

@ -1,58 +0,0 @@
~~ Licensed under the Apache License, Version 2.0 (the "License");
~~ you may not use this file except in compliance with the License.
~~ You may obtain a copy of the License at
~~
~~ http://www.apache.org/licenses/LICENSE-2.0
~~
~~ Unless required by applicable law or agreed to in writing, software
~~ distributed under the License is distributed on an "AS IS" BASIS,
~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~~ See the License for the specific language governing permissions and
~~ limitations under the License. See accompanying LICENSE file.
---
HFTP Guide
---
---
${maven.build.timestamp}
HFTP Guide
%{toc|section=1|fromDepth=0}
* Introduction
HFTP is a Hadoop filesystem implementation that lets you read data from
a remote Hadoop HDFS cluster. The reads are done via HTTP, and data is
sourced from DataNodes. HFTP is a read-only filesystem, and will throw
exceptions if you try to use it to write data or modify the filesystem
state.
HFTP is primarily useful if you have multiple HDFS clusters with
different versions and you need to move data from one to another. HFTP
is wire-compatible even between different versions of HDFS. For
example, you can do things like: <<<hadoop distcp -i hftp://sourceFS:50070/src hdfs://destFS:50070/dest>>>.
Note that HFTP is read-only so the destination must be an HDFS filesystem.
(Also, in this example, the distcp should be run using the configuraton of
the new filesystem.)
An extension, HSFTP, uses HTTPS by default. This means that data will
be encrypted in transit.
* Implementation
The code for HFTP lives in the Java class
<<<org.apache.hadoop.hdfs.HftpFileSystem>>>. Likewise, HSFTP is implemented
in <<<org.apache.hadoop.hdfs.HsftpFileSystem>>>.
* Configuration Options
*-----------------------:-----------------------------------+
| <<Name>> | <<Description>> |
*-----------------------:-----------------------------------+
| <<<dfs.hftp.https.port>>> | the HTTPS port on the remote cluster. If not set,
| | HFTP will fall back on <<<dfs.https.port>>>.
*-----------------------:-----------------------------------+
| <<<hdfs.service.host_ip:port>>> | Specifies the service name (for the security
| | subsystem) associated with the HFTP filesystem running at ip:port.
*-----------------------:-----------------------------------+

View File

@ -1,187 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.File;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
public class TestFiHftp {
final Log LOG = FileSystem.LOG;
{
((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
}
static final short DATANODE_NUM = 1;
static final Random ran = new Random();
static final byte[] buffer = new byte[1 << 16];
static final MessageDigest md5;
static {
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
private static byte[] createFile(FileSystem fs, Path name, long length,
short replication, long blocksize) throws IOException {
final FSDataOutputStream out = fs.create(name, false, 4096,
replication, blocksize);
try {
for(long n = length; n > 0; ) {
ran.nextBytes(buffer);
final int w = n < buffer.length? (int)n: buffer.length;
out.write(buffer, 0, w);
md5.update(buffer, 0, w);
n -= w;
}
} finally {
IOUtils.closeStream(out);
}
return md5.digest();
}
@Test
public void testHftpOpen() throws IOException {
final Configuration conf = new Configuration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build();
cluster.waitActive();
//test with a file
//which is larger than the servlet response buffer size
{
final long blocksize = 1L << 20; //
final long filesize = 2*blocksize + 100;
runTestHftpOpen(cluster, "/foo", blocksize, filesize);
}
//test with a small file
//which is smaller than the servlet response buffer size
{
final long blocksize = 1L << 10; //
final long filesize = 2*blocksize + 100;
runTestHftpOpen(cluster, "/small", blocksize, filesize);
}
} finally {
if (cluster != null) {cluster.shutdown();}
}
}
/**
* A test with a 3GB file.
* It may take ~6 minutes.
*/
void largeFileTest(final MiniDFSCluster cluster) throws IOException {
final long blocksize = 128L << 20;
final long filesize = 3L << 30;
runTestHftpOpen(cluster, "/large", blocksize, filesize);
}
/**
* @param blocksize
* @param filesize must be > block size
*/
private void runTestHftpOpen(final MiniDFSCluster cluster, final String file,
final long blocksize, final long filesize) throws IOException {
//create a file
final DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
final Path filepath = new Path(file);
final byte[] filemd5 = createFile(dfs, filepath, filesize, DATANODE_NUM,
blocksize);
DFSTestUtil.waitReplication(dfs, filepath, DATANODE_NUM);
//test hftp open and read
final HftpFileSystem hftpfs = cluster.getHftpFileSystem(0);
{
final FSDataInputStream in = hftpfs.open(filepath);
long bytesRead = 0;
try {
for(int r; (r = in.read(buffer)) != -1; ) {
bytesRead += r;
md5.update(buffer, 0, r);
}
} finally {
LOG.info("bytesRead=" + bytesRead);
in.close();
}
Assert.assertEquals(filesize, bytesRead);
Assert.assertArrayEquals(filemd5, md5.digest());
}
//delete the second block
final DFSClient client = dfs.getClient();
final LocatedBlocks locatedblocks = client.getNamenode().getBlockLocations(
file, 0, filesize);
Assert.assertEquals((filesize - 1)/blocksize + 1,
locatedblocks.locatedBlockCount());
final LocatedBlock lb = locatedblocks.get(1);
final ExtendedBlock blk = lb.getBlock();
Assert.assertEquals(blocksize, lb.getBlockSize());
final DatanodeInfo[] datanodeinfos = lb.getLocations();
Assert.assertEquals(DATANODE_NUM, datanodeinfos.length);
final DataNode dn = cluster.getDataNode(datanodeinfos[0].getIpcPort());
LOG.info("dn=" + dn + ", blk=" + blk + " (length=" + blk.getNumBytes() + ")");
final FSDataset data = (FSDataset)dn.getFSDataset();
final File blkfile = data.getBlockFile(blk);
Assert.assertTrue(blkfile.delete());
//read again by hftp, should get an exception
LOG.info("hftpfs.getUri() = " + hftpfs.getUri());
final ContentSummary cs = hftpfs.getContentSummary(filepath);
LOG.info("hftpfs.getContentSummary = " + cs);
Assert.assertEquals(filesize, cs.getLength());
final FSDataInputStream in = hftpfs.open(hftpfs.makeQualified(filepath));
long bytesRead = 0;
try {
for(int r; (r = in.read(buffer)) != -1; ) {
bytesRead += r;
}
Assert.fail();
} catch(IOException ioe) {
LOG.info("GOOD: get an exception", ioe);
} finally {
LOG.info("bytesRead=" + bytesRead);
in.close();
}
}
}

View File

@ -1,201 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.server.namenode.ListPathsServlet;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Test for {@link ListPathsServlet} that serves the URL
* http://<namenodeaddress:httpport?/listPaths
*
* This test does not use the servlet directly. Instead it is based on
* {@link HftpFileSystem}, which uses this servlet to implement
* {@link HftpFileSystem#listStatus(Path)} method.
*/
public class TestListPathServlet {
private static final Configuration CONF = new HdfsConfiguration();
private static MiniDFSCluster cluster;
private static FileSystem fs;
private static URI hftpURI;
private static HftpFileSystem hftpFs;
private final Random r = new Random();
private final List<String> filelist = new ArrayList<String>();
@BeforeClass
public static void setup() throws Exception {
// start a cluster with single datanode
cluster = new MiniDFSCluster.Builder(CONF).build();
cluster.waitActive();
fs = cluster.getFileSystem();
final String str = "hftp://"
+ CONF.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
hftpURI = new URI(str);
hftpFs = cluster.getHftpFileSystem(0);
}
@AfterClass
public static void teardown() {
cluster.shutdown();
}
/** create a file with a length of <code>fileLen</code> */
private void createFile(String fileName, long fileLen) throws IOException {
filelist.add(hftpURI + fileName);
final Path filePath = new Path(fileName);
DFSTestUtil.createFile(fs, filePath, fileLen, (short) 1, r.nextLong());
}
private void mkdirs(String dirName) throws IOException {
filelist.add(hftpURI + dirName);
fs.mkdirs(new Path(dirName));
}
@Test
public void testListStatus() throws Exception {
// Empty root directory
checkStatus("/");
// Root directory with files and directories
createFile("/a", 1);
createFile("/b", 1);
mkdirs("/dir");
checkFile(new Path("/a"));
checkFile(new Path("/b"));
checkStatus("/");
// A directory with files and directories
createFile("/dir/.a.crc", 1);
createFile("/dir/b", 1);
mkdirs("/dir/dir1");
checkFile(new Path("/dir/.a.crc"));
checkFile(new Path("/dir/b"));
checkStatus("/dir");
// Non existent path
checkStatus("/nonexistent");
checkStatus("/nonexistent/a");
final String username = UserGroupInformation.getCurrentUser().getShortUserName() + "1";
final HftpFileSystem hftp2 = cluster.getHftpFileSystemAs(username, CONF, 0, "somegroup");
{ //test file not found on hftp
final Path nonexistent = new Path("/nonexistent");
try {
hftp2.getFileStatus(nonexistent);
Assert.fail();
} catch(IOException ioe) {
FileSystem.LOG.info("GOOD: getting an exception", ioe);
}
}
{ //test permission error on hftp
final Path dir = new Path("/dir");
fs.setPermission(dir, new FsPermission((short)0));
try {
hftp2.getFileStatus(new Path(dir, "a"));
Assert.fail();
} catch(IOException ioe) {
FileSystem.LOG.info("GOOD: getting an exception", ioe);
}
}
}
private void checkStatus(String listdir) throws IOException {
final Path listpath = hftpFs.makeQualified(new Path(listdir));
listdir = listpath.toString();
final FileStatus[] statuslist = hftpFs.listStatus(listpath);
for (String directory : filelist) {
System.out.println("dir:" + directory);
}
for (String file : filelist) {
System.out.println("file:" + file);
}
for (FileStatus status : statuslist) {
System.out.println("status:" + status.getPath().toString() + " type "
+ (status.isDirectory() ? "directory"
: ( status.isFile() ? "file" : "symlink")));
}
for (String file : filelist) {
boolean found = false;
// Consider only file under the list path
if (!file.startsWith(listpath.toString()) ||
file.equals(listpath.toString())) {
continue;
}
for (FileStatus status : statuslist) {
if (status.getPath().toString().equals(file)) {
found = true;
break;
}
}
Assert.assertTrue("Directory/file not returned in list status " + file,
found);
}
}
private void checkFile(final Path f) throws IOException {
final Path hdfspath = fs.makeQualified(f);
final FileStatus hdfsstatus = fs.getFileStatus(hdfspath);
FileSystem.LOG.info("hdfspath=" + hdfspath);
final Path hftppath = hftpFs.makeQualified(f);
final FileStatus hftpstatus = hftpFs.getFileStatus(hftppath);
FileSystem.LOG.info("hftppath=" + hftppath);
Assert.assertEquals(hdfspath.toUri().getPath(),
hdfsstatus.getPath().toUri().getPath());
checkFileStatus(hdfsstatus, hftpstatus);
}
private static void checkFileStatus(final FileStatus expected,
final FileStatus computed) {
Assert.assertEquals(expected.getPath().toUri().getPath(),
computed.getPath().toUri().getPath());
// TODO: test will fail if the following is un-commented.
// Assert.assertEquals(expected.getAccessTime(), computed.getAccessTime());
// Assert.assertEquals(expected.getModificationTime(),
// computed.getModificationTime());
Assert.assertEquals(expected.getBlockSize(), computed.getBlockSize());
Assert.assertEquals(expected.getGroup(), computed.getGroup());
Assert.assertEquals(expected.getLen(), computed.getLen());
Assert.assertEquals(expected.getOwner(), computed.getOwner());
Assert.assertEquals(expected.getPermission(), computed.getPermission());
Assert.assertEquals(expected.getReplication(), computed.getReplication());
}
}

View File

@ -1,312 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.List;
import java.util.Vector;
import javax.servlet.ServletContext;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.net.NetUtils;
import org.junit.Test;
import org.mockito.Mockito;
import org.mortbay.jetty.InclusiveByteRange;
/*
* Mock input stream class that always outputs the current position of the stream.
*/
class MockFSInputStream extends FSInputStream {
long currentPos = 0;
@Override
public int read() throws IOException {
return (int)(currentPos++);
}
@Override
public void close() throws IOException {
}
@Override
public void seek(long pos) throws IOException {
currentPos = pos;
}
@Override
public long getPos() throws IOException {
return currentPos;
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
}
public class TestStreamFile {
private final HdfsConfiguration CONF = new HdfsConfiguration();
private final DFSClient clientMock = Mockito.mock(DFSClient.class);
private final HttpServletRequest mockHttpServletRequest =
Mockito.mock(HttpServletRequest.class);
private final HttpServletResponse mockHttpServletResponse =
Mockito.mock(HttpServletResponse.class);
private final ServletContext mockServletContext =
Mockito.mock(ServletContext.class);
final StreamFile sfile = new StreamFile() {
private static final long serialVersionUID = -5513776238875189473L;
@Override
public ServletContext getServletContext() {
return mockServletContext;
}
@Override
protected DFSClient getDFSClient(HttpServletRequest request)
throws IOException, InterruptedException {
return clientMock;
}
};
// return an array matching the output of mockfsinputstream
private static byte[] getOutputArray(int start, int count) {
byte[] a = new byte[count];
for (int i = 0; i < count; i++) {
a[i] = (byte)(start+i);
}
return a;
}
@Test
public void testWriteTo() throws IOException {
FSInputStream fsin = new MockFSInputStream();
ByteArrayOutputStream os = new ByteArrayOutputStream();
// new int[]{s_1, c_1, s_2, c_2, ..., s_n, c_n} means to test
// reading c_i bytes starting at s_i
int[] pairs = new int[]{ 0, 10000,
50, 100,
50, 6000,
1000, 2000,
0, 1,
0, 0,
5000, 0,
};
assertTrue("Pairs array must be even", pairs.length % 2 == 0);
for (int i = 0; i < pairs.length; i+=2) {
StreamFile.copyFromOffset(fsin, os, pairs[i], pairs[i+1]);
assertArrayEquals("Reading " + pairs[i+1]
+ " bytes from offset " + pairs[i],
getOutputArray(pairs[i], pairs[i+1]),
os.toByteArray());
os.reset();
}
}
@SuppressWarnings("unchecked")
private List<InclusiveByteRange> strToRanges(String s, int contentLength) {
List<String> l = Arrays.asList(new String[]{"bytes="+s});
Enumeration<?> e = (new Vector<String>(l)).elements();
return InclusiveByteRange.satisfiableRanges(e, contentLength);
}
@Test
public void testSendPartialData() throws IOException {
FSInputStream in = new MockFSInputStream();
ByteArrayOutputStream os = new ByteArrayOutputStream();
// test if multiple ranges, then 416
{
List<InclusiveByteRange> ranges = strToRanges("0-,10-300", 500);
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
StreamFile.sendPartialData(in, os, response, 500, ranges);
// Multiple ranges should result in a 416 error
Mockito.verify(response).setStatus(416);
}
// test if no ranges, then 416
{
os.reset();
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
StreamFile.sendPartialData(in, os, response, 500, null);
// No ranges should result in a 416 error
Mockito.verify(response).setStatus(416);
}
// test if invalid single range (out of bounds), then 416
{
List<InclusiveByteRange> ranges = strToRanges("600-800", 500);
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
StreamFile.sendPartialData(in, os, response, 500, ranges);
// Single (but invalid) range should result in a 416
Mockito.verify(response).setStatus(416);
}
// test if one (valid) range, then 206
{
List<InclusiveByteRange> ranges = strToRanges("100-300", 500);
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
StreamFile.sendPartialData(in, os, response, 500, ranges);
// Single (valid) range should result in a 206
Mockito.verify(response).setStatus(206);
assertArrayEquals("Byte range from 100-300",
getOutputArray(100, 201),
os.toByteArray());
}
}
// Test for positive scenario
@Test
public void testDoGetShouldWriteTheFileContentIntoServletOutputStream()
throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(1)
.build();
try {
Path testFile = createFile();
setUpForDoGetTest(cluster, testFile);
ServletOutputStreamExtn outStream = new ServletOutputStreamExtn();
Mockito.doReturn(outStream).when(mockHttpServletResponse)
.getOutputStream();
StreamFile sfile = new StreamFile() {
private static final long serialVersionUID = 7715590481809562722L;
@Override
public ServletContext getServletContext() {
return mockServletContext;
}
};
sfile.doGet(mockHttpServletRequest, mockHttpServletResponse);
assertEquals("Not writing the file data into ServletOutputStream",
outStream.getResult(), "test");
} finally {
cluster.shutdown();
}
}
// Test for cleaning the streams in exception cases also
@Test
public void testDoGetShouldCloseTheDFSInputStreamIfResponseGetOutPutStreamThrowsAnyException()
throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(1)
.build();
try {
Path testFile = createFile();
setUpForDoGetTest(cluster, testFile);
Mockito.doThrow(new IOException()).when(mockHttpServletResponse)
.getOutputStream();
DFSInputStream fsMock = Mockito.mock(DFSInputStream.class);
Mockito.doReturn(fsMock).when(clientMock).open(testFile.toString());
Mockito.doReturn(Long.valueOf(4)).when(fsMock).getFileLength();
try {
sfile.doGet(mockHttpServletRequest, mockHttpServletResponse);
fail("Not throwing the IOException");
} catch (IOException e) {
Mockito.verify(clientMock, Mockito.atLeastOnce()).close();
}
} finally {
cluster.shutdown();
}
}
private void setUpForDoGetTest(MiniDFSCluster cluster, Path testFile) {
Mockito.doReturn(CONF).when(mockServletContext).getAttribute(
JspHelper.CURRENT_CONF);
Mockito.doReturn(NetUtils.getHostPortString(NameNode.getAddress(CONF)))
.when(mockHttpServletRequest).getParameter("nnaddr");
Mockito.doReturn(testFile.toString()).when(mockHttpServletRequest)
.getPathInfo();
Mockito.doReturn("/streamFile"+testFile.toString()).when(mockHttpServletRequest)
.getRequestURI();
}
static Path writeFile(FileSystem fs, Path f) throws IOException {
DataOutputStream out = fs.create(f);
try {
out.writeBytes("test");
} finally {
out.close();
}
assertTrue(fs.exists(f));
return f;
}
private Path createFile() throws IOException {
FileSystem fs = FileSystem.get(CONF);
Path testFile = new Path("/test/mkdirs/doGet");
writeFile(fs, testFile);
return testFile;
}
public static class ServletOutputStreamExtn extends ServletOutputStream {
private final StringBuffer buffer = new StringBuffer(3);
public String getResult() {
return buffer.toString();
}
@Override
public void write(int b) throws IOException {
buffer.append((char) b);
}
}
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.tools;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.tools.FakeRenewer;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestDelegationTokenFetcher {
private Configuration conf = new Configuration();
@Rule
public TemporaryFolder f = new TemporaryFolder();
private static final String tokenFile = "token";
/**
* try to fetch token without http server with IOException
*/
@Test(expected = IOException.class)
public void testTokenFetchFail() throws Exception {
WebHdfsFileSystem fs = mock(WebHdfsFileSystem.class);
doThrow(new IOException()).when(fs).getDelegationToken(anyString());
Path p = new Path(f.getRoot().getAbsolutePath(), tokenFile);
DelegationTokenFetcher.saveDelegationToken(conf, fs, null, p);
}
/**
* Call fetch token using http server
*/
@Test
public void expectedTokenIsRetrievedFromHttp() throws Exception {
final Token<DelegationTokenIdentifier> testToken = new Token<DelegationTokenIdentifier>(
"id".getBytes(), "pwd".getBytes(), FakeRenewer.KIND, new Text(
"127.0.0.1:1234"));
WebHdfsFileSystem fs = mock(WebHdfsFileSystem.class);
doReturn(testToken).when(fs).getDelegationToken(anyString());
Path p = new Path(f.getRoot().getAbsolutePath(), tokenFile);
DelegationTokenFetcher.saveDelegationToken(conf, fs, null, p);
Credentials creds = Credentials.readTokenStorageFile(p, conf);
Iterator<Token<?>> itr = creds.getAllTokens().iterator();
assertTrue("token not exist error", itr.hasNext());
Token<?> fetchedToken = itr.next();
Assert.assertArrayEquals("token wrong identifier error",
testToken.getIdentifier(), fetchedToken.getIdentifier());
Assert.assertArrayEquals("token wrong password error",
testToken.getPassword(), fetchedToken.getPassword());
DelegationTokenFetcher.renewTokens(conf, p);
Assert.assertEquals(testToken, FakeRenewer.getLastRenewed());
DelegationTokenFetcher.cancelTokens(conf, p);
Assert.assertEquals(testToken, FakeRenewer.getLastCanceled());
}
}

View File

@ -1,249 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.junit.Test;
public class TestByteRangeInputStream {
public static class MockHttpURLConnection extends HttpURLConnection {
public MockHttpURLConnection(URL u) {
super(u);
}
@Override
public boolean usingProxy(){
return false;
}
@Override
public void disconnect() {
}
@Override
public void connect() {
}
@Override
public InputStream getInputStream() throws IOException {
return new ByteArrayInputStream("asdf".getBytes());
}
@Override
public URL getURL() {
URL u = null;
try {
u = new URL("http://resolvedurl/");
} catch (Exception e) {
System.out.println(e.getMessage());
}
return u;
}
@Override
public int getResponseCode() {
if (responseCode != -1) {
return responseCode;
} else {
if (getRequestProperty("Range") == null) {
return 200;
} else {
return 206;
}
}
}
public void setResponseCode(int resCode) {
responseCode = resCode;
}
@Override
public String getHeaderField(String field) {
return (field.equalsIgnoreCase(StreamFile.CONTENT_LENGTH)) ? "65535" : null;
}
}
@Test
public void testByteRange() throws IOException {
URLConnectionFactory factory = mock(URLConnectionFactory.class);
HftpFileSystem.RangeHeaderUrlOpener ospy = spy(
new HftpFileSystem.RangeHeaderUrlOpener(factory, new URL("http://test/")));
doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy)
.openConnection();
HftpFileSystem.RangeHeaderUrlOpener rspy = spy(
new HftpFileSystem.RangeHeaderUrlOpener(factory, null));
doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy)
.openConnection();
ByteRangeInputStream is = new HftpFileSystem.RangeHeaderInputStream(ospy, rspy);
assertEquals("getPos wrong", 0, is.getPos());
is.read();
assertNull("Initial call made incorrectly (Range Check)", ospy
.openConnection().getRequestProperty("Range"));
assertEquals("getPos should be 1 after reading one byte", 1, is.getPos());
is.read();
assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos());
// No additional connections should have been made (no seek)
rspy.setURL(new URL("http://resolvedurl/"));
is.seek(100);
is.read();
assertEquals("Seek to 100 bytes made incorrectly (Range Check)",
"bytes=100-", rspy.openConnection().getRequestProperty("Range"));
assertEquals("getPos should be 101 after reading one byte", 101,
is.getPos());
verify(rspy, times(2)).openConnection();
is.seek(101);
is.read();
verify(rspy, times(2)).openConnection();
// Seek to 101 should not result in another request"
is.seek(2500);
is.read();
assertEquals("Seek to 2500 bytes made incorrectly (Range Check)",
"bytes=2500-", rspy.openConnection().getRequestProperty("Range"));
((MockHttpURLConnection) rspy.openConnection()).setResponseCode(200);
is.seek(500);
try {
is.read();
fail("Exception should be thrown when 200 response is given "
+ "but 206 is expected");
} catch (IOException e) {
assertEquals("Should fail because incorrect response code was sent",
"HTTP_PARTIAL expected, received 200", e.getMessage());
}
((MockHttpURLConnection) rspy.openConnection()).setResponseCode(206);
is.seek(0);
try {
is.read();
fail("Exception should be thrown when 206 response is given "
+ "but 200 is expected");
} catch (IOException e) {
assertEquals("Should fail because incorrect response code was sent",
"HTTP_OK expected, received 206", e.getMessage());
}
is.close();
}
@Test
public void testPropagatedClose() throws IOException {
URLConnectionFactory factory = mock(URLConnectionFactory.class);
ByteRangeInputStream brs = spy(new HftpFileSystem.RangeHeaderInputStream(
factory, new URL("http://test/")));
InputStream mockStream = mock(InputStream.class);
doReturn(mockStream).when(brs).openInputStream();
int brisOpens = 0;
int brisCloses = 0;
int isCloses = 0;
// first open, shouldn't close underlying stream
brs.getInputStream();
verify(brs, times(++brisOpens)).openInputStream();
verify(brs, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close();
// stream is open, shouldn't close underlying stream
brs.getInputStream();
verify(brs, times(brisOpens)).openInputStream();
verify(brs, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close();
// seek forces a reopen, should close underlying stream
brs.seek(1);
brs.getInputStream();
verify(brs, times(++brisOpens)).openInputStream();
verify(brs, times(brisCloses)).close();
verify(mockStream, times(++isCloses)).close();
// verify that the underlying stream isn't closed after a seek
// ie. the state was correctly updated
brs.getInputStream();
verify(brs, times(brisOpens)).openInputStream();
verify(brs, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close();
// seeking to same location should be a no-op
brs.seek(1);
brs.getInputStream();
verify(brs, times(brisOpens)).openInputStream();
verify(brs, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close();
// close should of course close
brs.close();
verify(brs, times(++brisCloses)).close();
verify(mockStream, times(++isCloses)).close();
// it's already closed, underlying stream should not close
brs.close();
verify(brs, times(++brisCloses)).close();
verify(mockStream, times(isCloses)).close();
// it's closed, don't reopen it
boolean errored = false;
try {
brs.getInputStream();
} catch (IOException e) {
errored = true;
assertEquals("Stream closed", e.getMessage());
} finally {
assertTrue("Read a closed steam", errored);
}
verify(brs, times(brisOpens)).openInputStream();
verify(brs, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close();
}
}

View File

@ -1,96 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
public class TestHftpDelegationToken {
/**
* Test whether HftpFileSystem maintain wire-compatibility for 0.20.203 when
* obtaining delegation token. See HDFS-5440 for more details.
*/
@Test
public void testTokenCompatibilityFor203() throws IOException,
URISyntaxException, AuthenticationException {
Configuration conf = new Configuration();
HftpFileSystem fs = new HftpFileSystem();
Token<?> token = new Token<TokenIdentifier>(new byte[0], new byte[0],
DelegationTokenIdentifier.HDFS_DELEGATION_KIND, new Text(
"127.0.0.1:8020"));
Credentials cred = new Credentials();
cred.addToken(HftpFileSystem.TOKEN_KIND, token);
ByteArrayOutputStream os = new ByteArrayOutputStream();
cred.write(new DataOutputStream(os));
HttpURLConnection conn = mock(HttpURLConnection.class);
doReturn(new ByteArrayInputStream(os.toByteArray())).when(conn)
.getInputStream();
doReturn(HttpURLConnection.HTTP_OK).when(conn).getResponseCode();
URLConnectionFactory factory = mock(URLConnectionFactory.class);
doReturn(conn).when(factory).openConnection(Mockito.<URL> any(),
anyBoolean());
final URI uri = new URI("hftp://127.0.0.1:8020");
fs.initialize(uri, conf);
fs.connectionFactory = factory;
UserGroupInformation ugi = UserGroupInformation.createUserForTesting("foo",
new String[] { "bar" });
TokenAspect<HftpFileSystem> tokenAspect = new TokenAspect<HftpFileSystem>(
fs, SecurityUtil.buildTokenService(uri), HftpFileSystem.TOKEN_KIND);
tokenAspect.initDelegationToken(ugi);
tokenAspect.ensureTokenInitialized();
Assert.assertSame(HftpFileSystem.TOKEN_KIND, fs.getRenewToken().getKind());
Token<?> tok = (Token<?>) Whitebox.getInternalState(fs, "delegationToken");
Assert.assertNotSame("Not making a copy of the remote token", token, tok);
Assert.assertEquals(token.getKind(), tok.getKind());
}
}

View File

@ -1,395 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.util.ServletUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestHftpFileSystem {
private static final String BASEDIR = System.getProperty("test.build.dir",
"target/test-dir") + "/" + TestHftpFileSystem.class.getSimpleName();
private static String keystoresDir;
private static String sslConfDir;
private static Configuration config = null;
private static MiniDFSCluster cluster = null;
private static String blockPoolId = null;
private static String hftpUri = null;
private FileSystem hdfs = null;
private HftpFileSystem hftpFs = null;
private static final Path[] TEST_PATHS = new Path[] {
// URI does not encode, Request#getPathInfo returns /foo
new Path("/foo;bar"),
// URI does not encode, Request#getPathInfo returns verbatim
new Path("/foo+"), new Path("/foo+bar/foo+bar"),
new Path("/foo=bar/foo=bar"), new Path("/foo,bar/foo,bar"),
new Path("/foo@bar/foo@bar"), new Path("/foo&bar/foo&bar"),
new Path("/foo$bar/foo$bar"), new Path("/foo_bar/foo_bar"),
new Path("/foo~bar/foo~bar"), new Path("/foo.bar/foo.bar"),
new Path("/foo../bar/foo../bar"), new Path("/foo.../bar/foo.../bar"),
new Path("/foo'bar/foo'bar"),
new Path("/foo#bar/foo#bar"),
new Path("/foo!bar/foo!bar"),
// HDFS file names may not contain ":"
// URI percent encodes, Request#getPathInfo decodes
new Path("/foo bar/foo bar"), new Path("/foo?bar/foo?bar"),
new Path("/foo\">bar/foo\">bar"), };
@BeforeClass
public static void setUp() throws Exception {
config = new Configuration();
cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build();
blockPoolId = cluster.getNamesystem().getBlockPoolId();
hftpUri = "hftp://"
+ config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
File base = new File(BASEDIR);
FileUtil.fullyDelete(base);
base.mkdirs();
keystoresDir = new File(BASEDIR).getAbsolutePath();
sslConfDir = KeyStoreTestUtil.getClasspathDir(TestHftpFileSystem.class);
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, config, false);
}
@AfterClass
public static void tearDown() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
FileUtil.fullyDelete(new File(BASEDIR));
KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir);
}
@Before
public void initFileSystems() throws IOException {
hdfs = cluster.getFileSystem();
hftpFs = (HftpFileSystem) new Path(hftpUri).getFileSystem(config);
// clear out the namespace
for (FileStatus stat : hdfs.listStatus(new Path("/"))) {
hdfs.delete(stat.getPath(), true);
}
}
@After
public void resetFileSystems() throws IOException {
FileSystem.closeAll();
}
/**
* Test file creation and access with file names that need encoding.
*/
@Test
public void testFileNameEncoding() throws IOException, URISyntaxException {
for (Path p : TEST_PATHS) {
// Create and access the path (data and streamFile servlets)
FSDataOutputStream out = hdfs.create(p, true);
out.writeBytes("0123456789");
out.close();
FSDataInputStream in = hftpFs.open(p);
assertEquals('0', in.read());
in.close();
// Check the file status matches the path. Hftp returns a FileStatus
// with the entire URI, extract the path part.
assertEquals(p, new Path(hftpFs.getFileStatus(p).getPath().toUri()
.getPath()));
// Test list status (listPath servlet)
assertEquals(1, hftpFs.listStatus(p).length);
// Test content summary (contentSummary servlet)
assertNotNull("No content summary", hftpFs.getContentSummary(p));
// Test checksums (fileChecksum and getFileChecksum servlets)
assertNotNull("No file checksum", hftpFs.getFileChecksum(p));
}
}
private void testDataNodeRedirect(Path path) throws IOException {
// Create the file
if (hdfs.exists(path)) {
hdfs.delete(path, true);
}
FSDataOutputStream out = hdfs.create(path, (short) 1);
out.writeBytes("0123456789");
out.close();
// Get the path's block location so we can determine
// if we were redirected to the right DN.
BlockLocation[] locations = hdfs.getFileBlockLocations(path, 0, 10);
String xferAddr = locations[0].getNames()[0];
// Connect to the NN to get redirected
URL u = hftpFs.getNamenodeURL(
"/data" + ServletUtil.encodePath(path.toUri().getPath()),
"ugi=userx,groupy");
HttpURLConnection conn = (HttpURLConnection) u.openConnection();
HttpURLConnection.setFollowRedirects(true);
conn.connect();
conn.getInputStream();
boolean checked = false;
// Find the datanode that has the block according to locations
// and check that the URL was redirected to this DN's info port
for (DataNode node : cluster.getDataNodes()) {
DatanodeRegistration dnR = DataNodeTestUtils.getDNRegistrationForBP(node,
blockPoolId);
if (dnR.getXferAddr().equals(xferAddr)) {
checked = true;
assertEquals(dnR.getInfoPort(), conn.getURL().getPort());
}
}
assertTrue("The test never checked that location of "
+ "the block and hftp desitnation are the same", checked);
}
/**
* Test that clients are redirected to the appropriate DN.
*/
@Test
public void testDataNodeRedirect() throws IOException {
for (Path p : TEST_PATHS) {
testDataNodeRedirect(p);
}
}
/**
* Tests getPos() functionality.
*/
@Test
public void testGetPos() throws IOException {
final Path testFile = new Path("/testfile+1");
// Write a test file.
FSDataOutputStream out = hdfs.create(testFile, true);
out.writeBytes("0123456789");
out.close();
FSDataInputStream in = hftpFs.open(testFile);
// Test read().
for (int i = 0; i < 5; ++i) {
assertEquals(i, in.getPos());
in.read();
}
// Test read(b, off, len).
assertEquals(5, in.getPos());
byte[] buffer = new byte[10];
assertEquals(2, in.read(buffer, 0, 2));
assertEquals(7, in.getPos());
// Test read(b).
int bytesRead = in.read(buffer);
assertEquals(7 + bytesRead, in.getPos());
// Test EOF.
for (int i = 0; i < 100; ++i) {
in.read();
}
assertEquals(10, in.getPos());
in.close();
}
/**
* Tests seek().
*/
@Test
public void testSeek() throws IOException {
final Path testFile = new Path("/testfile+1");
FSDataOutputStream out = hdfs.create(testFile, true);
out.writeBytes("0123456789");
out.close();
FSDataInputStream in = hftpFs.open(testFile);
in.seek(7);
assertEquals('7', in.read());
in.close();
}
@Test
public void testReadClosedStream() throws IOException {
final Path testFile = new Path("/testfile+2");
FSDataOutputStream os = hdfs.create(testFile, true);
os.writeBytes("0123456789");
os.close();
// ByteRangeInputStream delays opens until reads. Make sure it doesn't
// open a closed stream that has never been opened
FSDataInputStream in = hftpFs.open(testFile);
in.close();
checkClosedStream(in);
checkClosedStream(in.getWrappedStream());
// force the stream to connect and then close it
in = hftpFs.open(testFile);
int ch = in.read();
assertEquals('0', ch);
in.close();
checkClosedStream(in);
checkClosedStream(in.getWrappedStream());
// make sure seeking doesn't automagically reopen the stream
in.seek(4);
checkClosedStream(in);
checkClosedStream(in.getWrappedStream());
}
private void checkClosedStream(InputStream is) {
IOException ioe = null;
try {
is.read();
} catch (IOException e) {
ioe = e;
}
assertNotNull("No exception on closed read", ioe);
assertEquals("Stream closed", ioe.getMessage());
}
@Test
public void testHftpDefaultPorts() throws IOException {
Configuration conf = new Configuration();
URI uri = URI.create("hftp://localhost");
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT,
fs.getDefaultPort());
assertEquals(uri, fs.getUri());
// HFTP uses http to get the token so canonical service name should
// return the http port.
assertEquals("127.0.0.1:" + DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT,
fs.getCanonicalServiceName());
}
@Test
public void testHftpCustomUriPortWithDefaultPorts() throws IOException {
Configuration conf = new Configuration();
URI uri = URI.create("hftp://localhost:123");
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT,
fs.getDefaultPort());
assertEquals(uri, fs.getUri());
assertEquals("127.0.0.1:123", fs.getCanonicalServiceName());
}
@Test
public void testHftpCustomUriPortWithCustomDefaultPorts() throws IOException {
Configuration conf = new Configuration();
URI uri = URI.create("hftp://localhost:789");
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT,
fs.getDefaultPort());
assertEquals(uri, fs.getUri());
assertEquals("127.0.0.1:789", fs.getCanonicalServiceName());
}
@Test
public void testTimeout() throws IOException {
Configuration conf = new Configuration();
URI uri = URI.create("hftp://localhost");
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
URLConnection conn = fs.connectionFactory.openConnection(new URL(
"http://localhost"));
assertEquals(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
conn.getConnectTimeout());
assertEquals(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
conn.getReadTimeout());
}
// /
@Test
public void testHsftpDefaultPorts() throws IOException {
Configuration conf = new Configuration();
URI uri = URI.create("hsftp://localhost");
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT,
fs.getDefaultPort());
assertEquals(uri, fs.getUri());
assertEquals("127.0.0.1:" + DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT,
fs.getCanonicalServiceName());
}
@Test
public void testHsftpCustomUriPortWithDefaultPorts() throws IOException {
Configuration conf = new Configuration();
URI uri = URI.create("hsftp://localhost:123");
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT,
fs.getDefaultPort());
assertEquals(uri, fs.getUri());
assertEquals("127.0.0.1:123", fs.getCanonicalServiceName());
}
@Test
public void testHsftpCustomUriPortWithCustomDefaultPorts() throws IOException {
Configuration conf = new Configuration();
URI uri = URI.create("hsftp://localhost:789");
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT,
fs.getDefaultPort());
assertEquals(uri, fs.getUri());
assertEquals("127.0.0.1:789", fs.getCanonicalServiceName());
}
}

View File

@ -1,111 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.mockito.Matchers.*;
public class TestDelegationTokenFetcher {
private DistributedFileSystem dfs;
private Configuration conf;
private URI uri;
private static final String SERVICE_VALUE = "localhost:2005";
private static final String tokenFile = "file.dta";
@Before
public void init() throws URISyntaxException, IOException {
dfs = mock(DistributedFileSystem.class);
conf = new Configuration();
uri = new URI("hdfs://" + SERVICE_VALUE);
FileSystemTestHelper.addFileSystemForTesting(uri, conf, dfs);
}
/**
* Verify that when the DelegationTokenFetcher runs, it talks to the Namenode,
* pulls out the correct user's token and successfully serializes it to disk.
*/
@Test
public void expectedTokenIsRetrievedFromDFS() throws Exception {
final byte[] ident = new DelegationTokenIdentifier(new Text("owner"),
new Text("renewer"), new Text("realuser")).getBytes();
final byte[] pw = new byte[] { 42 };
final Text service = new Text(uri.toString());
// Create a token for the fetcher to fetch, wire NN to return it when asked
// for this particular user.
final Token<DelegationTokenIdentifier> t =
new Token<DelegationTokenIdentifier>(ident, pw, FakeRenewer.KIND, service);
when(dfs.addDelegationTokens(eq((String) null), any(Credentials.class))).thenAnswer(
new Answer<Token<?>[]>() {
@Override
public Token<?>[] answer(InvocationOnMock invocation) {
Credentials creds = (Credentials)invocation.getArguments()[1];
creds.addToken(service, t);
return new Token<?>[]{t};
}
});
when(dfs.getUri()).thenReturn(uri);
FakeRenewer.reset();
FileSystem fileSys = FileSystem.getLocal(conf);
try {
DelegationTokenFetcher.main(new String[] { "-fs", uri.toString(),
tokenFile });
Path p = new Path(fileSys.getWorkingDirectory(), tokenFile);
Credentials creds = Credentials.readTokenStorageFile(p, conf);
Iterator<Token<?>> itr = creds.getAllTokens().iterator();
// make sure we got back exactly the 1 token we expected
assertTrue(itr.hasNext());
assertEquals(t, itr.next());
assertTrue(!itr.hasNext());
DelegationTokenFetcher.main(new String[] { "--print", tokenFile });
DelegationTokenFetcher.main(new String[] { "--renew", tokenFile });
assertEquals(t, FakeRenewer.lastRenewed);
FakeRenewer.reset();
DelegationTokenFetcher.main(new String[] { "--cancel", tokenFile });
assertEquals(t, FakeRenewer.lastCanceled);
} finally {
fileSys.delete(new Path(tokenFile), true);
}
}
}

View File

@ -1,374 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Logger;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.UnmodifiableIterator;
public class TestDelegationTokenRemoteFetcher {
private static final Logger LOG = Logger
.getLogger(TestDelegationTokenRemoteFetcher.class);
private static final String EXP_DATE = "124123512361236";
private static final String tokenFile = "http.file.dta";
private static final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
private int httpPort;
private URI serviceUrl;
private FileSystem fileSys;
private Configuration conf;
private ServerBootstrap bootstrap;
private Token<DelegationTokenIdentifier> testToken;
private volatile AssertionError assertionError;
@Before
public void init() throws Exception {
conf = new Configuration();
fileSys = FileSystem.getLocal(conf);
httpPort = NetUtils.getFreeSocketPort();
serviceUrl = new URI("http://localhost:" + httpPort);
testToken = createToken(serviceUrl);
}
@After
public void clean() throws IOException {
if (fileSys != null)
fileSys.delete(new Path(tokenFile), true);
if (bootstrap != null)
bootstrap.releaseExternalResources();
}
/**
* try to fetch token without http server with IOException
*/
@Test
public void testTokenFetchFail() throws Exception {
try {
DelegationTokenFetcher.main(new String[] { "-webservice=" + serviceUrl,
tokenFile });
fail("Token fetcher shouldn't start in absense of NN");
} catch (IOException ex) {
}
}
/**
* try to fetch token without http server with IOException
*/
@Test
public void testTokenRenewFail() throws AuthenticationException {
try {
DelegationTokenFetcher.renewDelegationToken(connectionFactory, serviceUrl, testToken);
fail("Token fetcher shouldn't be able to renew tokens in absense of NN");
} catch (IOException ex) {
}
}
/**
* try cancel token without http server with IOException
*/
@Test
public void expectedTokenCancelFail() throws AuthenticationException {
try {
DelegationTokenFetcher.cancelDelegationToken(connectionFactory, serviceUrl, testToken);
fail("Token fetcher shouldn't be able to cancel tokens in absense of NN");
} catch (IOException ex) {
}
}
/**
* try fetch token and get http response with error
*/
@Test
public void expectedTokenRenewErrorHttpResponse()
throws AuthenticationException, URISyntaxException {
bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
try {
DelegationTokenFetcher.renewDelegationToken(connectionFactory, new URI(
serviceUrl.toString() + "/exception"), createToken(serviceUrl));
fail("Token fetcher shouldn't be able to renew tokens using an invalid"
+ " NN URL");
} catch (IOException ex) {
}
if (assertionError != null)
throw assertionError;
}
/**
*
*/
@Test
public void testCancelTokenFromHttp() throws IOException,
AuthenticationException {
bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
DelegationTokenFetcher.cancelDelegationToken(connectionFactory, serviceUrl,
testToken);
if (assertionError != null)
throw assertionError;
}
/**
* Call renew token using http server return new expiration time
*/
@Test
public void testRenewTokenFromHttp() throws IOException,
NumberFormatException, AuthenticationException {
bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
assertTrue("testRenewTokenFromHttp error",
Long.parseLong(EXP_DATE) == DelegationTokenFetcher.renewDelegationToken(
connectionFactory, serviceUrl, testToken));
if (assertionError != null)
throw assertionError;
}
/**
* Call fetch token using http server
*/
@Test
public void expectedTokenIsRetrievedFromHttp() throws Exception {
bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
DelegationTokenFetcher.main(new String[] { "-webservice=" + serviceUrl,
tokenFile });
Path p = new Path(fileSys.getWorkingDirectory(), tokenFile);
Credentials creds = Credentials.readTokenStorageFile(p, conf);
Iterator<Token<?>> itr = creds.getAllTokens().iterator();
assertTrue("token not exist error", itr.hasNext());
Token<?> fetchedToken = itr.next();
Assert.assertArrayEquals("token wrong identifier error",
testToken.getIdentifier(), fetchedToken.getIdentifier());
Assert.assertArrayEquals("token wrong password error",
testToken.getPassword(), fetchedToken.getPassword());
if (assertionError != null)
throw assertionError;
}
private static Token<DelegationTokenIdentifier> createToken(URI serviceUri) {
byte[] pw = "hadoop".getBytes();
byte[] ident = new DelegationTokenIdentifier(new Text("owner"), new Text(
"renewer"), new Text("realuser")).getBytes();
Text service = new Text(serviceUri.toString());
return new Token<DelegationTokenIdentifier>(ident, pw,
HftpFileSystem.TOKEN_KIND, service);
}
private interface Handler {
void handle(Channel channel, Token<DelegationTokenIdentifier> token,
String serviceUrl) throws IOException;
}
private class FetchHandler implements Handler {
@Override
public void handle(Channel channel, Token<DelegationTokenIdentifier> token,
String serviceUrl) throws IOException {
Assert.assertEquals(testToken, token);
Credentials creds = new Credentials();
creds.addToken(new Text(serviceUrl), token);
DataOutputBuffer out = new DataOutputBuffer();
creds.write(out);
int fileLength = out.getData().length;
ChannelBuffer cbuffer = ChannelBuffers.buffer(fileLength);
cbuffer.writeBytes(out.getData());
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
response.setHeader(HttpHeaders.Names.CONTENT_LENGTH,
String.valueOf(fileLength));
response.setContent(cbuffer);
channel.write(response).addListener(ChannelFutureListener.CLOSE);
}
}
private class RenewHandler implements Handler {
@Override
public void handle(Channel channel, Token<DelegationTokenIdentifier> token,
String serviceUrl) throws IOException {
Assert.assertEquals(testToken, token);
byte[] bytes = EXP_DATE.getBytes();
ChannelBuffer cbuffer = ChannelBuffers.buffer(bytes.length);
cbuffer.writeBytes(bytes);
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
response.setHeader(HttpHeaders.Names.CONTENT_LENGTH,
String.valueOf(bytes.length));
response.setContent(cbuffer);
channel.write(response).addListener(ChannelFutureListener.CLOSE);
}
}
private class ExceptionHandler implements Handler {
@Override
public void handle(Channel channel, Token<DelegationTokenIdentifier> token,
String serviceUrl) throws IOException {
Assert.assertEquals(testToken, token);
HttpResponse response = new DefaultHttpResponse(HTTP_1_1,
HttpResponseStatus.METHOD_NOT_ALLOWED);
channel.write(response).addListener(ChannelFutureListener.CLOSE);
}
}
private class CancelHandler implements Handler {
@Override
public void handle(Channel channel, Token<DelegationTokenIdentifier> token,
String serviceUrl) throws IOException {
Assert.assertEquals(testToken, token);
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
channel.write(response).addListener(ChannelFutureListener.CLOSE);
}
}
private final class CredentialsLogicHandler extends
SimpleChannelUpstreamHandler {
private final Token<DelegationTokenIdentifier> token;
private final String serviceUrl;
private final ImmutableMap<String, Handler> routes = ImmutableMap.of(
"/exception", new ExceptionHandler(),
"/cancelDelegationToken", new CancelHandler(),
"/getDelegationToken", new FetchHandler() ,
"/renewDelegationToken", new RenewHandler());
public CredentialsLogicHandler(Token<DelegationTokenIdentifier> token,
String serviceUrl) {
this.token = token;
this.serviceUrl = serviceUrl;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
throws Exception {
HttpRequest request = (HttpRequest) e.getMessage();
if (request.getMethod() == HttpMethod.OPTIONS) {
// Mimic SPNEGO authentication
HttpResponse response = new DefaultHttpResponse(HTTP_1_1,
HttpResponseStatus.OK);
response.addHeader("Set-Cookie", "hadoop-auth=1234");
e.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
} else if (request.getMethod() != GET) {
e.getChannel().close();
}
UnmodifiableIterator<Map.Entry<String, Handler>> iter = routes.entrySet()
.iterator();
while (iter.hasNext()) {
Map.Entry<String, Handler> entry = iter.next();
if (request.getUri().contains(entry.getKey())) {
Handler handler = entry.getValue();
try {
handler.handle(e.getChannel(), token, serviceUrl);
} catch (AssertionError ee) {
TestDelegationTokenRemoteFetcher.this.assertionError = ee;
HttpResponse response = new DefaultHttpResponse(HTTP_1_1,
HttpResponseStatus.BAD_REQUEST);
response.setContent(ChannelBuffers.copiedBuffer(ee.getMessage(),
Charset.defaultCharset()));
e.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
}
return;
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
Channel ch = e.getChannel();
Throwable cause = e.getCause();
if (LOG.isDebugEnabled())
LOG.debug(cause.getMessage());
ch.close().addListener(ChannelFutureListener.CLOSE);
}
}
private ServerBootstrap startHttpServer(int port,
final Token<DelegationTokenIdentifier> token, final URI url) {
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new HttpRequestDecoder(),
new HttpChunkAggregator(65536), new HttpResponseEncoder(),
new CredentialsLogicHandler(token, url.toString()));
}
});
bootstrap.bind(new InetSocketAddress("localhost", port));
return bootstrap;
}
}