HDFS-12512. RBF: Add WebHDFS.

This commit is contained in:
weiy 2018-03-23 08:32:25 -07:00
parent 75fc05f369
commit 6e31a09084
18 changed files with 1368 additions and 25 deletions

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.service.AbstractService;
@ -86,6 +87,9 @@ public class RouterHttpServer extends AbstractService {
this.httpServer = builder.build();
NameNodeHttpServer.initWebHdfs(conf, httpAddress.getHostName(), httpServer,
RouterWebHdfsMethods.class.getPackage().getName());
this.httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, this.router);
this.httpServer.setAttribute(JspHelper.CURRENT_CONF, this.conf);
setupServlets(this.httpServer, this.conf);

View File

@ -567,7 +567,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
* @return The remote location for this file.
* @throws IOException If the file has no creation location.
*/
private RemoteLocation getCreateLocation(final String src)
protected RemoteLocation getCreateLocation(final String src)
throws IOException {
final List<RemoteLocation> locations = getLocationsForPath(src, true);

View File

@ -0,0 +1,655 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.util.StringUtils.getTrimmedStringCollection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Path;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.hdfs.web.ParamFilter;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
import org.apache.hadoop.hdfs.web.resources.AclPermissionParam;
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
import org.apache.hadoop.hdfs.web.resources.CreateFlagParam;
import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.DestinationParam;
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
import org.apache.hadoop.hdfs.web.resources.ExcludeDatanodesParam;
import org.apache.hadoop.hdfs.web.resources.FsActionParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.GroupParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
import org.apache.hadoop.hdfs.web.resources.NewLengthParam;
import org.apache.hadoop.hdfs.web.resources.NoRedirectParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.OldSnapshotNameParam;
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
import org.apache.hadoop.hdfs.web.resources.OwnerParam;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.hdfs.web.resources.PermissionParam;
import org.apache.hadoop.hdfs.web.resources.PostOpParam;
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
import org.apache.hadoop.hdfs.web.resources.RenewerParam;
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
import org.apache.hadoop.hdfs.web.resources.SnapshotNameParam;
import org.apache.hadoop.hdfs.web.resources.StartAfterParam;
import org.apache.hadoop.hdfs.web.resources.StoragePolicyParam;
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
import org.apache.hadoop.hdfs.web.resources.TokenKindParam;
import org.apache.hadoop.hdfs.web.resources.TokenServiceParam;
import org.apache.hadoop.hdfs.web.resources.UnmaskedPermissionParam;
import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
import org.apache.hadoop.hdfs.web.resources.UserParam;
import org.apache.hadoop.hdfs.web.resources.XAttrEncodingParam;
import org.apache.hadoop.hdfs.web.resources.XAttrNameParam;
import org.apache.hadoop.hdfs.web.resources.XAttrSetFlagParam;
import org.apache.hadoop.hdfs.web.resources.XAttrValueParam;
import org.apache.hadoop.ipc.ExternalCall;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLDecoder;
import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
/**
* WebHDFS Router implementation. This is an extension of
* {@link NamenodeWebHdfsMethods}, and tries to reuse as much as possible.
*/
@Path("")
@ResourceFilters(ParamFilter.class)
public class RouterWebHdfsMethods extends NamenodeWebHdfsMethods {
private static final Logger LOG =
LoggerFactory.getLogger(RouterWebHdfsMethods.class);
private static final ThreadLocal<String> REMOTE_ADDRESS =
new ThreadLocal<String>();
private @Context HttpServletRequest request;
private String method;
private String query;
private String reqPath;
public RouterWebHdfsMethods(@Context HttpServletRequest request) {
super(request);
this.method = request.getMethod();
this.query = request.getQueryString();
this.reqPath = request.getServletPath();
REMOTE_ADDRESS.set(JspHelper.getRemoteAddr(request));
}
@Override
protected void init(final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username, final DoAsParam doAsUser,
final UriFsPathParam path, final HttpOpParam<?> op,
final Param<?, ?>... parameters) {
super.init(ugi, delegation, username, doAsUser, path, op, parameters);
REMOTE_ADDRESS.set(JspHelper.getRemoteAddr(request));
}
@Override
protected ClientProtocol getRpcClientProtocol() throws IOException {
final Router router = getRouter();
final RouterRpcServer routerRpcServer = router.getRpcServer();
if (routerRpcServer == null) {
throw new RetriableException("Router is in startup mode");
}
return routerRpcServer;
}
private void reset() {
REMOTE_ADDRESS.set(null);
}
@Override
protected String getRemoteAddr() {
return REMOTE_ADDRESS.get();
}
@Override
protected void queueExternalCall(ExternalCall call)
throws IOException, InterruptedException {
getRouter().getRpcServer().getServer().queueCall(call);
}
private Router getRouter() {
return (Router)getContext().getAttribute("name.node");
}
private static RouterRpcServer getRPCServer(final Router router)
throws IOException {
final RouterRpcServer routerRpcServer = router.getRpcServer();
if (routerRpcServer == null) {
throw new RetriableException("Router is in startup mode");
}
return routerRpcServer;
}
@Override
protected Response put(
final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username,
final DoAsParam doAsUser,
final String fullpath,
final PutOpParam op,
final DestinationParam destination,
final OwnerParam owner,
final GroupParam group,
final PermissionParam permission,
final UnmaskedPermissionParam unmaskedPermission,
final OverwriteParam overwrite,
final BufferSizeParam bufferSize,
final ReplicationParam replication,
final BlockSizeParam blockSize,
final ModificationTimeParam modificationTime,
final AccessTimeParam accessTime,
final RenameOptionSetParam renameOptions,
final CreateParentParam createParent,
final TokenArgumentParam delegationTokenArgument,
final AclPermissionParam aclPermission,
final XAttrNameParam xattrName,
final XAttrValueParam xattrValue,
final XAttrSetFlagParam xattrSetFlag,
final SnapshotNameParam snapshotName,
final OldSnapshotNameParam oldSnapshotName,
final ExcludeDatanodesParam exclDatanodes,
final CreateFlagParam createFlagParam,
final NoRedirectParam noredirectParam,
final StoragePolicyParam policyName
) throws IOException, URISyntaxException {
switch(op.getValue()) {
case CREATE:
{
final Router router = getRouter();
final URI uri = redirectURI(router, fullpath);
if (!noredirectParam.getValue()) {
return Response.temporaryRedirect(uri)
.type(MediaType.APPLICATION_OCTET_STREAM).build();
} else {
final String js = JsonUtil.toJsonString("Location", uri);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
}
case MKDIRS:
case CREATESYMLINK:
case RENAME:
case SETREPLICATION:
case SETOWNER:
case SETPERMISSION:
case SETTIMES:
case RENEWDELEGATIONTOKEN:
case CANCELDELEGATIONTOKEN:
case MODIFYACLENTRIES:
case REMOVEACLENTRIES:
case REMOVEDEFAULTACL:
case REMOVEACL:
case SETACL:
case SETXATTR:
case REMOVEXATTR:
case ALLOWSNAPSHOT:
case CREATESNAPSHOT:
case RENAMESNAPSHOT:
case DISALLOWSNAPSHOT:
case SETSTORAGEPOLICY:
{
// Whitelist operations that can handled by NamenodeWebHdfsMethods
return super.put(ugi, delegation, username, doAsUser, fullpath, op,
destination, owner, group, permission, unmaskedPermission,
overwrite, bufferSize, replication, blockSize, modificationTime,
accessTime, renameOptions, createParent, delegationTokenArgument,
aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
oldSnapshotName, exclDatanodes, createFlagParam, noredirectParam,
policyName);
}
default:
throw new UnsupportedOperationException(op + " is not supported");
}
}
@Override
protected Response post(
final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username,
final DoAsParam doAsUser,
final String fullpath,
final PostOpParam op,
final ConcatSourcesParam concatSrcs,
final BufferSizeParam bufferSize,
final ExcludeDatanodesParam excludeDatanodes,
final NewLengthParam newLength,
final NoRedirectParam noRedirectParam
) throws IOException, URISyntaxException {
switch(op.getValue()) {
case APPEND:
{
final Router router = getRouter();
final URI uri = redirectURI(router, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), -1L,
excludeDatanodes.getValue(), bufferSize);
if (!noRedirectParam.getValue()) {
return Response.temporaryRedirect(uri)
.type(MediaType.APPLICATION_OCTET_STREAM).build();
} else {
final String js = JsonUtil.toJsonString("Location", uri);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
}
case CONCAT:
case TRUNCATE:
case UNSETSTORAGEPOLICY:
{
return super.post(ugi, delegation, username, doAsUser, fullpath, op,
concatSrcs, bufferSize, excludeDatanodes, newLength,
noRedirectParam);
}
default:
throw new UnsupportedOperationException(op + " is not supported");
}
}
@Override
protected Response get(
final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username,
final DoAsParam doAsUser,
final String fullpath,
final GetOpParam op,
final OffsetParam offset,
final LengthParam length,
final RenewerParam renewer,
final BufferSizeParam bufferSize,
final List<XAttrNameParam> xattrNames,
final XAttrEncodingParam xattrEncoding,
final ExcludeDatanodesParam excludeDatanodes,
final FsActionParam fsAction,
final SnapshotNameParam snapshotName,
final OldSnapshotNameParam oldSnapshotName,
final TokenKindParam tokenKind,
final TokenServiceParam tokenService,
final NoRedirectParam noredirectParam,
final StartAfterParam startAfter
) throws IOException, URISyntaxException {
try {
final Router router = getRouter();
switch (op.getValue()) {
case OPEN:
{
final URI uri = redirectURI(router, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), offset.getValue(),
excludeDatanodes.getValue(), offset, length, bufferSize);
if (!noredirectParam.getValue()) {
return Response.temporaryRedirect(uri)
.type(MediaType.APPLICATION_OCTET_STREAM).build();
} else {
final String js = JsonUtil.toJsonString("Location", uri);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
}
case GETFILECHECKSUM:
{
final URI uri = redirectURI(router, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), -1L, null);
if (!noredirectParam.getValue()) {
return Response.temporaryRedirect(uri)
.type(MediaType.APPLICATION_OCTET_STREAM).build();
} else {
final String js = JsonUtil.toJsonString("Location", uri);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
}
case GET_BLOCK_LOCATIONS:
case GETFILESTATUS:
case LISTSTATUS:
case GETCONTENTSUMMARY:
case GETHOMEDIRECTORY:
case GETACLSTATUS:
case GETXATTRS:
case LISTXATTRS:
case CHECKACCESS:
{
return super.get(ugi, delegation, username, doAsUser, fullpath, op,
offset, length, renewer, bufferSize, xattrNames, xattrEncoding,
excludeDatanodes, fsAction, snapshotName, oldSnapshotName,
tokenKind, tokenService, noredirectParam, startAfter);
}
default:
throw new UnsupportedOperationException(op + " is not supported");
}
} finally {
reset();
}
}
/**
* Get the redirect URI from the Namenode responsible for a path.
* @param router Router to check.
* @param path Path to get location for.
* @return URI returned by the Namenode.
* @throws IOException If it cannot get the redirect URI.
*/
private URI redirectURI(final Router router, final String path)
throws IOException {
// Forward the request to the proper Namenode
final HttpURLConnection conn = forwardRequest(router, path);
try {
conn.setInstanceFollowRedirects(false);
conn.setDoOutput(true);
conn.connect();
// Read the reply from the Namenode
int responseCode = conn.getResponseCode();
if (responseCode != HttpServletResponse.SC_TEMPORARY_REDIRECT) {
LOG.info("We expected a redirection from the Namenode, not {}",
responseCode);
return null;
}
// Extract the redirect location and return it
String redirectLocation = conn.getHeaderField("Location");
try {
// We modify the namenode location and the path
redirectLocation = redirectLocation
.replaceAll("(?<=[?&;])namenoderpcaddress=.*?(?=[&;])",
"namenoderpcaddress=" + router.getRouterId())
.replaceAll("(?<=[/])webhdfs/v1/.*?(?=[?])",
"webhdfs/v1" + path);
return new URI(redirectLocation);
} catch (URISyntaxException e) {
LOG.error("Cannot parse redirect location {}", redirectLocation);
}
} finally {
if (conn != null) {
conn.disconnect();
}
}
return null;
}
/**
* Forwards a request to a subcluster.
* @param router Router to check.
* @param path Path in HDFS.
* @return Reply from the subcluster.
* @throws IOException
*/
private HttpURLConnection forwardRequest(
final Router router, final String path) throws IOException {
final Configuration conf =
(Configuration)getContext().getAttribute(JspHelper.CURRENT_CONF);
URLConnectionFactory connectionFactory =
URLConnectionFactory.newDefaultURLConnectionFactory(conf);
// Find the namespace responsible for a path
final RouterRpcServer rpcServer = getRPCServer(router);
RemoteLocation createLoc = rpcServer.getCreateLocation(path);
String nsId = createLoc.getNameserviceId();
String dest = createLoc.getDest();
ActiveNamenodeResolver nnResolver = router.getNamenodeResolver();
List<? extends FederationNamenodeContext> namenodes =
nnResolver.getNamenodesForNameserviceId(nsId);
// Go over the namenodes responsible for that namespace
for (FederationNamenodeContext namenode : namenodes) {
try {
// Generate the request for the namenode
String nnWebAddress = namenode.getWebAddress();
String[] nnWebAddressSplit = nnWebAddress.split(":");
String host = nnWebAddressSplit[0];
int port = Integer.parseInt(nnWebAddressSplit[1]);
// Avoid double-encoding here
query = URLDecoder.decode(query, "UTF-8");
URI uri = new URI(getScheme(), null, host, port,
reqPath + dest, query, null);
URL url = uri.toURL();
// Send a request to the proper Namenode
final HttpURLConnection conn =
(HttpURLConnection)connectionFactory.openConnection(url);
conn.setRequestMethod(method);
return conn;
} catch (Exception e) {
LOG.error("Cannot redirect request to {}", namenode, e);
}
}
return null;
}
/**
* Get a URI to redirect an operation to.
* @param router Router to check.
* @param ugi User group information.
* @param delegation Delegation token.
* @param username User name.
* @param doAsUser Do as user.
* @param path Path to check.
* @param op Operation to perform.
* @param openOffset Offset for opening a file.
* @param excludeDatanodes Blocks to excluded.
* @param parameters Other parameters.
* @return Redirection URI.
* @throws URISyntaxException If it cannot parse the URI.
* @throws IOException If it cannot create the URI.
*/
private URI redirectURI(final Router router, final UserGroupInformation ugi,
final DelegationParam delegation, final UserParam username,
final DoAsParam doAsUser, final String path, final HttpOpParam.Op op,
final long openOffset, final String excludeDatanodes,
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
final DatanodeInfo dn =
chooseDatanode(router, path, op, openOffset, excludeDatanodes);
if (dn == null) {
throw new IOException("Failed to find datanode, suggest to check cluster"
+ " health. excludeDatanodes=" + excludeDatanodes);
}
final String delegationQuery;
if (!UserGroupInformation.isSecurityEnabled()) {
// security disabled
delegationQuery = Param.toSortedString("&", doAsUser, username);
} else if (delegation.getValue() != null) {
// client has provided a token
delegationQuery = "&" + delegation;
} else {
// generate a token
final Token<? extends TokenIdentifier> t = generateDelegationToken(
router, ugi, request.getUserPrincipal().getName());
delegationQuery = "&delegation=" + t.encodeToUrlString();
}
final String redirectQuery = op.toQueryString() + delegationQuery
+ "&namenoderpcaddress=" + router.getRouterId()
+ Param.toSortedString("&", parameters);
final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;
int port = "http".equals(getScheme()) ? dn.getInfoPort() :
dn.getInfoSecurePort();
final URI uri = new URI(getScheme(), null, dn.getHostName(), port, uripath,
redirectQuery, null);
if (LOG.isTraceEnabled()) {
LOG.trace("redirectURI={}", uri);
}
return uri;
}
private DatanodeInfo chooseDatanode(final Router router,
final String path, final HttpOpParam.Op op, final long openOffset,
final String excludeDatanodes) throws IOException {
// We need to get the DNs as a privileged user
final RouterRpcServer rpcServer = getRPCServer(router);
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
DatanodeInfo[] dns = loginUser.doAs(
new PrivilegedAction<DatanodeInfo[]>() {
@Override
public DatanodeInfo[] run() {
try {
return rpcServer.getDatanodeReport(DatanodeReportType.LIVE);
} catch (IOException e) {
LOG.error("Cannot get the datanodes from the RPC server", e);
return null;
}
}
});
HashSet<Node> excludes = new HashSet<Node>();
if (excludeDatanodes != null) {
Collection<String> collection =
getTrimmedStringCollection(excludeDatanodes);
for (DatanodeInfo dn : dns) {
if (collection.contains(dn.getName())) {
excludes.add(dn);
}
}
}
if (op == GetOpParam.Op.OPEN ||
op == PostOpParam.Op.APPEND ||
op == GetOpParam.Op.GETFILECHECKSUM) {
// Choose a datanode containing a replica
final ClientProtocol cp = getRpcClientProtocol();
final HdfsFileStatus status = cp.getFileInfo(path);
if (status == null) {
throw new FileNotFoundException("File " + path + " not found.");
}
final long len = status.getLen();
if (op == GetOpParam.Op.OPEN) {
if (openOffset < 0L || (openOffset >= len && len > 0)) {
throw new IOException("Offset=" + openOffset
+ " out of the range [0, " + len + "); " + op + ", path=" + path);
}
}
if (len > 0) {
final long offset = op == GetOpParam.Op.OPEN ? openOffset : len - 1;
final LocatedBlocks locations = cp.getBlockLocations(path, offset, 1);
final int count = locations.locatedBlockCount();
if (count > 0) {
LocatedBlock location0 = locations.get(0);
return bestNode(location0.getLocations(), excludes);
}
}
}
return getRandomDatanode(dns, excludes);
}
/**
* Get a random Datanode from a subcluster.
* @param dns Nodes to be chosen from.
* @param excludes Nodes to be excluded from.
* @return Random datanode from a particular subluster.
*/
private static DatanodeInfo getRandomDatanode(
final DatanodeInfo[] dns, final HashSet<Node> excludes) {
DatanodeInfo dn = null;
if (dns == null) {
return dn;
}
int numDNs = dns.length;
int availableNodes = 0;
if (excludes.isEmpty()) {
availableNodes = numDNs;
} else {
for (DatanodeInfo di : dns) {
if (!excludes.contains(di)) {
availableNodes++;
}
}
}
// Return a random one from the list
if (availableNodes > 0) {
while (dn == null || excludes.contains(dn)) {
Random rnd = new Random();
int idx = rnd.nextInt(numDNs);
dn = dns[idx];
}
}
return dn;
}
/**
* Generate the delegation tokens for this request.
* @param router Router.
* @param ugi User group information.
* @param renewer Who is asking for the renewal.
* @return The delegation tokens.
* @throws IOException If it cannot create the tokens.
*/
private Token<? extends TokenIdentifier> generateDelegationToken(
final Router router, final UserGroupInformation ugi,
final String renewer) throws IOException {
throw new UnsupportedOperationException("TODO Generate token for ugi=" +
ugi + " request=" + request);
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.router.web;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.contract.hdfs.HDFSContract;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
/**
* The contract of Router-based Federated HDFS
* This changes its feature set from platform for platform -the default
* set is updated during initialization.
*/
public class RouterWebHDFSContract extends HDFSContract {
public static final Logger LOG =
LoggerFactory.getLogger(WebHdfsFileSystem.class);
public static final String CONTRACT_WEBHDFS_XML = "contract/webhdfs.xml";
private static MiniRouterDFSCluster cluster;
public RouterWebHDFSContract(Configuration conf) {
super(conf);
addConfResource(CONTRACT_WEBHDFS_XML);
}
public static void createCluster() throws IOException {
try {
HdfsConfiguration conf = new HdfsConfiguration();
conf.addResource(CONTRACT_HDFS_XML);
conf.addResource(CONTRACT_WEBHDFS_XML);
cluster = new MiniRouterDFSCluster(true, 2);
// Start NNs and DNs and wait until ready
cluster.startCluster();
// Start routers with only an RPC service
cluster.startRouters();
// Register and verify all NNs with all routers
cluster.registerNamenodes();
cluster.waitNamenodeRegistration();
// Setup the mount table
cluster.installMockLocations();
// Making one Namenodes active per nameservice
if (cluster.isHighAvailability()) {
for (String ns : cluster.getNameservices()) {
cluster.switchToActive(ns, NAMENODES[0]);
cluster.switchToStandby(ns, NAMENODES[1]);
}
}
} catch (Exception e) {
cluster = null;
throw new IOException("Cannot start federated cluster", e);
}
}
public static void destroyCluster() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
public static MiniDFSCluster getCluster() {
return cluster.getCluster();
}
@Override
public FileSystem getTestFileSystem() throws IOException {
return getFileSystem();
}
public static FileSystem getFileSystem() throws IOException {
//assumes cluster is not null
Assert.assertNotNull("cluster not created", cluster);
// Create a connection to WebHDFS
try {
RouterContext router = cluster.getRandomRouter();
String uriStr =
WebHdfsConstants.WEBHDFS_SCHEME + "://" + router.getHttpAddress();
URI uri = new URI(uriStr);
Configuration conf = new HdfsConfiguration();
return FileSystem.get(uri, conf);
} catch (URISyntaxException e) {
LOG.error("Cannot create URI for the WebHDFS filesystem", e);
}
return null;
}
@Override
public String getScheme() {
return WebHdfsConstants.WEBHDFS_SCHEME;
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package org.apache.hadoop.fs.contract.router.web;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractAppendTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.IOException;
/**
* Test append operations on a Router WebHDFS FS.
*/
public class TestRouterWebHDFSContractAppend
extends AbstractContractAppendTest {
@BeforeClass
public static void createCluster() throws IOException {
RouterWebHDFSContract.createCluster();
}
@AfterClass
public static void teardownCluster() throws IOException {
RouterWebHDFSContract.destroyCluster();
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new RouterWebHDFSContract(conf);
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.router.web;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractContractConcatTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.IOException;
/**
* Test concat operations on a Router WebHDFS FS.
*/
public class TestRouterWebHDFSContractConcat
extends AbstractContractConcatTest {
@BeforeClass
public static void createCluster() throws IOException {
RouterWebHDFSContract.createCluster();
// perform a simple operation on the cluster to verify it is up
RouterWebHDFSContract.getFileSystem().getDefaultBlockSize(new Path("/"));
}
@AfterClass
public static void teardownCluster() throws IOException {
RouterWebHDFSContract.destroyCluster();
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new RouterWebHDFSContract(conf);
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.router.web;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.IOException;
/**
* Test create operations on a Router WebHDFS FS.
*/
public class TestRouterWebHDFSContractCreate
extends AbstractContractCreateTest {
@BeforeClass
public static void createCluster() throws IOException {
RouterWebHDFSContract.createCluster();
}
@AfterClass
public static void teardownCluster() throws IOException {
RouterWebHDFSContract.destroyCluster();
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new RouterWebHDFSContract(conf);
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.router.web;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractDeleteTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.IOException;
/**
* Test delete operations on a Router WebHDFS FS.
*/
public class TestRouterWebHDFSContractDelete
extends AbstractContractDeleteTest {
@BeforeClass
public static void createCluster() throws IOException {
RouterWebHDFSContract.createCluster();
}
@AfterClass
public static void teardownCluster() throws IOException {
RouterWebHDFSContract.destroyCluster();
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new RouterWebHDFSContract(conf);
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.router.web;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.IOException;
/**
* Test dir operations on a Router WebHDFS FS.
*/
public class TestRouterWebHDFSContractMkdir extends AbstractContractMkdirTest {
@BeforeClass
public static void createCluster() throws IOException {
RouterWebHDFSContract.createCluster();
}
@AfterClass
public static void teardownCluster() throws IOException {
RouterWebHDFSContract.destroyCluster();
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new RouterWebHDFSContract(conf);
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.router.web;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
/**
* Test open operations on a Router WebHDFS FS.
*/
public class TestRouterWebHDFSContractOpen extends AbstractContractOpenTest {
@BeforeClass
public static void createCluster() throws IOException {
RouterWebHDFSContract.createCluster();
}
@AfterClass
public static void teardownCluster() throws IOException {
RouterWebHDFSContract.destroyCluster();
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new RouterWebHDFSContract(conf);
}
@Override
@Test
public void testOpenReadDir() throws Throwable {
// WebHDFS itself allows open read on directory, we may need to
// fix this first before make this test work
}
@Override
@Test
public void testOpenReadDirWithChild() throws Throwable {
// WebHDFS itself allows open read on directory, we may need to
// fix this first before make this test work
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.router.web;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.IOException;
/**
* Test rename operations on a Router WebHDFS FS.
*/
public class TestRouterWebHDFSContractRename
extends AbstractContractRenameTest {
@BeforeClass
public static void createCluster() throws IOException {
RouterWebHDFSContract.createCluster();
}
@AfterClass
public static void teardownCluster() throws IOException {
RouterWebHDFSContract.destroyCluster();
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new RouterWebHDFSContract(conf);
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.router.web;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.IOException;
/**
* Test dir operations on a Router WebHDFS FS.
*/
public class TestRouterWebHDFSContractRootDirectory extends
AbstractContractRootDirectoryTest {
@BeforeClass
public static void createCluster() throws IOException {
RouterWebHDFSContract.createCluster();
}
@AfterClass
public static void teardownCluster() throws IOException {
RouterWebHDFSContract.destroyCluster();
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new RouterWebHDFSContract(conf);
}
@Override
public void testListEmptyRootDirectory() throws IOException {
// It doesn't apply because we still have the mount points here
}
@Override
public void testRmEmptyRootDirNonRecursive() throws IOException {
// It doesn't apply because we still have the mount points here
}
@Override
public void testRecursiveRootListing() throws IOException {
// It doesn't apply because we still have the mount points here
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.router.web;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.IOException;
/**
* Test seek operations on a Router WebHDFS FS.
*/
public class TestRouterWebHDFSContractSeek extends AbstractContractSeekTest {
@BeforeClass
public static void createCluster() throws IOException {
RouterWebHDFSContract.createCluster();
}
@AfterClass
public static void teardownCluster() throws IOException {
RouterWebHDFSContract.destroyCluster();
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new RouterWebHDFSContract(conf);
}
@Override
public void testNegativeSeek() throws Throwable {
System.out.println("Not supported");
}
@Override
public void testSeekReadClosedFile() throws Throwable {
System.out.println("Not supported");
}
@Override
public void testSeekPastEndOfFileThenReseekAndRead() throws Throwable {
System.out.println("Not supported");
}
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Test the WebHDFS contract.
*/
package org.apache.hadoop.fs.contract.router.web;

View File

@ -181,6 +181,11 @@ public class MiniRouterDFSCluster {
return this.fileContext;
}
public String getHttpAddress() {
InetSocketAddress httpAddress = router.getHttpServerAddress();
return NetUtils.getHostPortString(httpAddress);
}
public void initRouter() throws URISyntaxException {
// Store the bound points for the router interfaces
InetSocketAddress rpcAddress = router.getRpcServerAddress();

View File

@ -0,0 +1,26 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<property>
<name>fs.contract.supports-strict-exceptions</name>
<value>false</value>
</property>
</configuration>

View File

@ -76,7 +76,9 @@ public class NameNodeHttpServer {
this.bindAddress = bindAddress;
}
private void initWebHdfs(Configuration conf) throws IOException {
public static void initWebHdfs(Configuration conf, String hostname,
HttpServer2 httpServer2, String jerseyResourcePackage)
throws IOException {
// set user pattern based on configuration file
UserParam.setUserPattern(conf.get(
HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
@ -92,8 +94,8 @@ public class NameNodeHttpServer {
final String name = className;
final String pathSpec = WebHdfsFileSystem.PATH_PREFIX + "/*";
Map<String, String> params = getAuthFilterParams(conf);
HttpServer2.defineFilter(httpServer.getWebAppContext(), name, className,
Map<String, String> params = getAuthFilterParams(conf, hostname);
HttpServer2.defineFilter(httpServer2.getWebAppContext(), name, className,
params, new String[] { pathSpec });
HttpServer2.LOG.info("Added filter '" + name + "' (class=" + className
+ ")");
@ -104,13 +106,14 @@ public class NameNodeHttpServer {
Map<String, String> restCsrfParams = RestCsrfPreventionFilter
.getFilterParams(conf, "dfs.webhdfs.rest-csrf.");
String restCsrfClassName = RestCsrfPreventionFilter.class.getName();
HttpServer2.defineFilter(httpServer.getWebAppContext(), restCsrfClassName,
restCsrfClassName, restCsrfParams, new String[] {pathSpec});
HttpServer2.defineFilter(httpServer2.getWebAppContext(),
restCsrfClassName, restCsrfClassName, restCsrfParams,
new String[] {pathSpec});
}
// add webhdfs packages
httpServer.addJerseyResourcePackage(NamenodeWebHdfsMethods.class
.getPackage().getName() + ";" + Param.class.getPackage().getName(),
httpServer2.addJerseyResourcePackage(
jerseyResourcePackage + ";" + Param.class.getPackage().getName(),
pathSpec);
}
@ -165,7 +168,8 @@ public class NameNodeHttpServer {
datanodeSslPort.getPort());
}
initWebHdfs(conf);
initWebHdfs(conf, bindAddress.getHostName(), httpServer,
NamenodeWebHdfsMethods.class.getPackage().getName());
httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn);
httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
@ -186,8 +190,8 @@ public class NameNodeHttpServer {
}
}
private Map<String, String> getAuthFilterParams(Configuration conf)
throws IOException {
private static Map<String, String> getAuthFilterParams(Configuration conf,
String hostname) throws IOException {
Map<String, String> params = new HashMap<String, String>();
// Select configs beginning with 'dfs.web.authentication.'
Iterator<Map.Entry<String, String>> iterator = conf.iterator();
@ -203,8 +207,7 @@ public class NameNodeHttpServer {
params
.put(
DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
SecurityUtil.getServerPrincipal(principalInConf,
bindAddress.getHostName()));
SecurityUtil.getServerPrincipal(principalInConf, hostname));
} else if (UserGroupInformation.isSecurityEnabled()) {
HttpServer2.LOG.error(
"WebHDFS and security are enabled, but configuration property '" +

View File

@ -143,7 +143,7 @@ public class NamenodeWebHdfsMethods {
Boolean.valueOf(request.getHeader(WebHdfsFileSystem.EZ_HEADER));
}
private void init(final UserGroupInformation ugi,
protected void init(final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username, final DoAsParam doAsUser,
final UriFsPathParam path, final HttpOpParam<?> op,
@ -184,6 +184,14 @@ public class NamenodeWebHdfsMethods {
return cp;
}
protected String getScheme() {
return scheme;
}
protected ServletContext getContext() {
return context;
}
private <T> T doAs(final UserGroupInformation ugi,
final PrivilegedExceptionAction<T> action)
throws IOException, InterruptedException {
@ -206,7 +214,7 @@ public class NamenodeWebHdfsMethods {
}
@Override
public String getHostAddress() {
return remoteAddr;
return getRemoteAddr();
}
@Override
public InetAddress getHostInetAddress() {
@ -217,8 +225,8 @@ public class NamenodeWebHdfsMethods {
}
}
};
final NameNode namenode = (NameNode)context.getAttribute("name.node");
namenode.queueExternalCall(call);
queueExternalCall(call);
T result = null;
try {
result = call.get();
@ -235,6 +243,16 @@ public class NamenodeWebHdfsMethods {
return result;
}
protected String getRemoteAddr() {
return remoteAddr;
}
protected void queueExternalCall(ExternalCall call)
throws IOException, InterruptedException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
namenode.queueExternalCall(call);
}
@VisibleForTesting
static DatanodeInfo chooseDatanode(final NameNode namenode,
final String path, final HttpOpParam.Op op, final long openOffset,
@ -306,7 +324,7 @@ public class NamenodeWebHdfsMethods {
* sorted based on availability and network distances, thus it is sufficient
* to return the first element of the node here.
*/
private static DatanodeInfo bestNode(DatanodeInfo[] nodes,
protected static DatanodeInfo bestNode(DatanodeInfo[] nodes,
HashSet<Node> excludes) throws IOException {
for (DatanodeInfo dn: nodes) {
if (false == dn.isDecommissioned() && false == excludes.contains(dn)) {
@ -470,7 +488,7 @@ public class NamenodeWebHdfsMethods {
/** Validate all required params. */
@SuppressWarnings("rawtypes")
private void validateOpParams(HttpOpParam<?> op, Param... params) {
protected void validateOpParams(HttpOpParam<?> op, Param... params) {
for (Param param : params) {
if (param.getValue() == null || param.getValueString() == null || param
.getValueString().isEmpty()) {
@ -570,7 +588,7 @@ public class NamenodeWebHdfsMethods {
});
}
private Response put(
protected Response put(
final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username,
@ -602,14 +620,13 @@ public class NamenodeWebHdfsMethods {
final NoRedirectParam noredirectParam,
final StoragePolicyParam policyName
) throws IOException, URISyntaxException {
final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final ClientProtocol cp = getRpcClientProtocol();
switch(op.getValue()) {
case CREATE:
{
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final URI uri = redirectURI(null, namenode, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf),
exclDatanodes.getValue(), permission, unmaskedPermission,
@ -840,7 +857,7 @@ public class NamenodeWebHdfsMethods {
});
}
private Response post(
protected Response post(
final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username,
@ -1014,7 +1031,7 @@ public class NamenodeWebHdfsMethods {
return encodedValue;
}
private Response get(
protected Response get(
final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username,
@ -1344,7 +1361,7 @@ public class NamenodeWebHdfsMethods {
});
}
private Response delete(
protected Response delete(
final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username,