parent
26b08e002f
commit
f4d6fee96b
|
@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
|
||||
import org.apache.hadoop.http.HttpServer2;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
|
||||
|
@ -86,6 +87,9 @@ public class RouterHttpServer extends AbstractService {
|
|||
|
||||
this.httpServer = builder.build();
|
||||
|
||||
NameNodeHttpServer.initWebHdfs(conf, httpAddress.getHostName(), httpServer,
|
||||
RouterWebHdfsMethods.class.getPackage().getName());
|
||||
|
||||
this.httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, this.router);
|
||||
this.httpServer.setAttribute(JspHelper.CURRENT_CONF, this.conf);
|
||||
setupServlets(this.httpServer, this.conf);
|
||||
|
|
|
@ -562,7 +562,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|||
* @return The remote location for this file.
|
||||
* @throws IOException If the file has no creation location.
|
||||
*/
|
||||
private RemoteLocation getCreateLocation(final String src)
|
||||
protected RemoteLocation getCreateLocation(final String src)
|
||||
throws IOException {
|
||||
|
||||
final List<RemoteLocation> locations = getLocationsForPath(src, true);
|
||||
|
|
|
@ -0,0 +1,655 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import static org.apache.hadoop.util.StringUtils.getTrimmedStringCollection;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
||||
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
|
||||
import com.sun.jersey.spi.container.ResourceFilters;
|
||||
import org.apache.hadoop.hdfs.web.JsonUtil;
|
||||
import org.apache.hadoop.hdfs.web.ParamFilter;
|
||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.AclPermissionParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.CreateFlagParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.DestinationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.ExcludeDatanodesParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.FsActionParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.GroupParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.LengthParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.NewLengthParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.NoRedirectParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.OldSnapshotNameParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.OwnerParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.Param;
|
||||
import org.apache.hadoop.hdfs.web.resources.PermissionParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.PostOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.RenewerParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.SnapshotNameParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.StartAfterParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.StoragePolicyParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.TokenKindParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.TokenServiceParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.UnmaskedPermissionParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.XAttrEncodingParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.XAttrNameParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.XAttrSetFlagParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.XAttrValueParam;
|
||||
import org.apache.hadoop.ipc.ExternalCall;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.net.URLDecoder;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
* WebHDFS Router implementation. This is an extension of
|
||||
* {@link NamenodeWebHdfsMethods}, and tries to reuse as much as possible.
|
||||
*/
|
||||
@Path("")
|
||||
@ResourceFilters(ParamFilter.class)
|
||||
public class RouterWebHdfsMethods extends NamenodeWebHdfsMethods {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(RouterWebHdfsMethods.class);
|
||||
|
||||
private static final ThreadLocal<String> REMOTE_ADDRESS =
|
||||
new ThreadLocal<String>();
|
||||
|
||||
private @Context HttpServletRequest request;
|
||||
private String method;
|
||||
private String query;
|
||||
private String reqPath;
|
||||
|
||||
public RouterWebHdfsMethods(@Context HttpServletRequest request) {
|
||||
super(request);
|
||||
this.method = request.getMethod();
|
||||
this.query = request.getQueryString();
|
||||
this.reqPath = request.getServletPath();
|
||||
REMOTE_ADDRESS.set(JspHelper.getRemoteAddr(request));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init(final UserGroupInformation ugi,
|
||||
final DelegationParam delegation,
|
||||
final UserParam username, final DoAsParam doAsUser,
|
||||
final UriFsPathParam path, final HttpOpParam<?> op,
|
||||
final Param<?, ?>... parameters) {
|
||||
super.init(ugi, delegation, username, doAsUser, path, op, parameters);
|
||||
|
||||
REMOTE_ADDRESS.set(JspHelper.getRemoteAddr(request));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClientProtocol getRpcClientProtocol() throws IOException {
|
||||
final Router router = getRouter();
|
||||
final RouterRpcServer routerRpcServer = router.getRpcServer();
|
||||
if (routerRpcServer == null) {
|
||||
throw new RetriableException("Router is in startup mode");
|
||||
}
|
||||
return routerRpcServer;
|
||||
}
|
||||
|
||||
private void reset() {
|
||||
REMOTE_ADDRESS.set(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getRemoteAddr() {
|
||||
return REMOTE_ADDRESS.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void queueExternalCall(ExternalCall call)
|
||||
throws IOException, InterruptedException {
|
||||
getRouter().getRpcServer().getServer().queueCall(call);
|
||||
}
|
||||
|
||||
private Router getRouter() {
|
||||
return (Router)getContext().getAttribute("name.node");
|
||||
}
|
||||
|
||||
private static RouterRpcServer getRPCServer(final Router router)
|
||||
throws IOException {
|
||||
final RouterRpcServer routerRpcServer = router.getRpcServer();
|
||||
if (routerRpcServer == null) {
|
||||
throw new RetriableException("Router is in startup mode");
|
||||
}
|
||||
return routerRpcServer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response put(
|
||||
final UserGroupInformation ugi,
|
||||
final DelegationParam delegation,
|
||||
final UserParam username,
|
||||
final DoAsParam doAsUser,
|
||||
final String fullpath,
|
||||
final PutOpParam op,
|
||||
final DestinationParam destination,
|
||||
final OwnerParam owner,
|
||||
final GroupParam group,
|
||||
final PermissionParam permission,
|
||||
final UnmaskedPermissionParam unmaskedPermission,
|
||||
final OverwriteParam overwrite,
|
||||
final BufferSizeParam bufferSize,
|
||||
final ReplicationParam replication,
|
||||
final BlockSizeParam blockSize,
|
||||
final ModificationTimeParam modificationTime,
|
||||
final AccessTimeParam accessTime,
|
||||
final RenameOptionSetParam renameOptions,
|
||||
final CreateParentParam createParent,
|
||||
final TokenArgumentParam delegationTokenArgument,
|
||||
final AclPermissionParam aclPermission,
|
||||
final XAttrNameParam xattrName,
|
||||
final XAttrValueParam xattrValue,
|
||||
final XAttrSetFlagParam xattrSetFlag,
|
||||
final SnapshotNameParam snapshotName,
|
||||
final OldSnapshotNameParam oldSnapshotName,
|
||||
final ExcludeDatanodesParam exclDatanodes,
|
||||
final CreateFlagParam createFlagParam,
|
||||
final NoRedirectParam noredirectParam,
|
||||
final StoragePolicyParam policyName
|
||||
) throws IOException, URISyntaxException {
|
||||
|
||||
switch(op.getValue()) {
|
||||
case CREATE:
|
||||
{
|
||||
final Router router = getRouter();
|
||||
final URI uri = redirectURI(router, fullpath);
|
||||
if (!noredirectParam.getValue()) {
|
||||
return Response.temporaryRedirect(uri)
|
||||
.type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
} else {
|
||||
final String js = JsonUtil.toJsonString("Location", uri);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
}
|
||||
case MKDIRS:
|
||||
case CREATESYMLINK:
|
||||
case RENAME:
|
||||
case SETREPLICATION:
|
||||
case SETOWNER:
|
||||
case SETPERMISSION:
|
||||
case SETTIMES:
|
||||
case RENEWDELEGATIONTOKEN:
|
||||
case CANCELDELEGATIONTOKEN:
|
||||
case MODIFYACLENTRIES:
|
||||
case REMOVEACLENTRIES:
|
||||
case REMOVEDEFAULTACL:
|
||||
case REMOVEACL:
|
||||
case SETACL:
|
||||
case SETXATTR:
|
||||
case REMOVEXATTR:
|
||||
case ALLOWSNAPSHOT:
|
||||
case CREATESNAPSHOT:
|
||||
case RENAMESNAPSHOT:
|
||||
case DISALLOWSNAPSHOT:
|
||||
case SETSTORAGEPOLICY:
|
||||
{
|
||||
// Whitelist operations that can handled by NamenodeWebHdfsMethods
|
||||
return super.put(ugi, delegation, username, doAsUser, fullpath, op,
|
||||
destination, owner, group, permission, unmaskedPermission,
|
||||
overwrite, bufferSize, replication, blockSize, modificationTime,
|
||||
accessTime, renameOptions, createParent, delegationTokenArgument,
|
||||
aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
|
||||
oldSnapshotName, exclDatanodes, createFlagParam, noredirectParam,
|
||||
policyName);
|
||||
}
|
||||
default:
|
||||
throw new UnsupportedOperationException(op + " is not supported");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response post(
|
||||
final UserGroupInformation ugi,
|
||||
final DelegationParam delegation,
|
||||
final UserParam username,
|
||||
final DoAsParam doAsUser,
|
||||
final String fullpath,
|
||||
final PostOpParam op,
|
||||
final ConcatSourcesParam concatSrcs,
|
||||
final BufferSizeParam bufferSize,
|
||||
final ExcludeDatanodesParam excludeDatanodes,
|
||||
final NewLengthParam newLength,
|
||||
final NoRedirectParam noRedirectParam
|
||||
) throws IOException, URISyntaxException {
|
||||
switch(op.getValue()) {
|
||||
case APPEND:
|
||||
{
|
||||
final Router router = getRouter();
|
||||
final URI uri = redirectURI(router, ugi, delegation, username,
|
||||
doAsUser, fullpath, op.getValue(), -1L,
|
||||
excludeDatanodes.getValue(), bufferSize);
|
||||
if (!noRedirectParam.getValue()) {
|
||||
return Response.temporaryRedirect(uri)
|
||||
.type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
} else {
|
||||
final String js = JsonUtil.toJsonString("Location", uri);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
}
|
||||
case CONCAT:
|
||||
case TRUNCATE:
|
||||
case UNSETSTORAGEPOLICY:
|
||||
{
|
||||
return super.post(ugi, delegation, username, doAsUser, fullpath, op,
|
||||
concatSrcs, bufferSize, excludeDatanodes, newLength,
|
||||
noRedirectParam);
|
||||
}
|
||||
default:
|
||||
throw new UnsupportedOperationException(op + " is not supported");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response get(
|
||||
final UserGroupInformation ugi,
|
||||
final DelegationParam delegation,
|
||||
final UserParam username,
|
||||
final DoAsParam doAsUser,
|
||||
final String fullpath,
|
||||
final GetOpParam op,
|
||||
final OffsetParam offset,
|
||||
final LengthParam length,
|
||||
final RenewerParam renewer,
|
||||
final BufferSizeParam bufferSize,
|
||||
final List<XAttrNameParam> xattrNames,
|
||||
final XAttrEncodingParam xattrEncoding,
|
||||
final ExcludeDatanodesParam excludeDatanodes,
|
||||
final FsActionParam fsAction,
|
||||
final SnapshotNameParam snapshotName,
|
||||
final OldSnapshotNameParam oldSnapshotName,
|
||||
final TokenKindParam tokenKind,
|
||||
final TokenServiceParam tokenService,
|
||||
final NoRedirectParam noredirectParam,
|
||||
final StartAfterParam startAfter
|
||||
) throws IOException, URISyntaxException {
|
||||
try {
|
||||
final Router router = getRouter();
|
||||
|
||||
switch (op.getValue()) {
|
||||
case OPEN:
|
||||
{
|
||||
final URI uri = redirectURI(router, ugi, delegation, username,
|
||||
doAsUser, fullpath, op.getValue(), offset.getValue(),
|
||||
excludeDatanodes.getValue(), offset, length, bufferSize);
|
||||
if (!noredirectParam.getValue()) {
|
||||
return Response.temporaryRedirect(uri)
|
||||
.type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
} else {
|
||||
final String js = JsonUtil.toJsonString("Location", uri);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
}
|
||||
case GETFILECHECKSUM:
|
||||
{
|
||||
final URI uri = redirectURI(router, ugi, delegation, username,
|
||||
doAsUser, fullpath, op.getValue(), -1L, null);
|
||||
if (!noredirectParam.getValue()) {
|
||||
return Response.temporaryRedirect(uri)
|
||||
.type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
} else {
|
||||
final String js = JsonUtil.toJsonString("Location", uri);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
}
|
||||
case GET_BLOCK_LOCATIONS:
|
||||
case GETFILESTATUS:
|
||||
case LISTSTATUS:
|
||||
case GETCONTENTSUMMARY:
|
||||
case GETHOMEDIRECTORY:
|
||||
case GETACLSTATUS:
|
||||
case GETXATTRS:
|
||||
case LISTXATTRS:
|
||||
case CHECKACCESS:
|
||||
{
|
||||
return super.get(ugi, delegation, username, doAsUser, fullpath, op,
|
||||
offset, length, renewer, bufferSize, xattrNames, xattrEncoding,
|
||||
excludeDatanodes, fsAction, snapshotName, oldSnapshotName,
|
||||
tokenKind, tokenService, noredirectParam, startAfter);
|
||||
}
|
||||
default:
|
||||
throw new UnsupportedOperationException(op + " is not supported");
|
||||
}
|
||||
} finally {
|
||||
reset();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the redirect URI from the Namenode responsible for a path.
|
||||
* @param router Router to check.
|
||||
* @param path Path to get location for.
|
||||
* @return URI returned by the Namenode.
|
||||
* @throws IOException If it cannot get the redirect URI.
|
||||
*/
|
||||
private URI redirectURI(final Router router, final String path)
|
||||
throws IOException {
|
||||
// Forward the request to the proper Namenode
|
||||
final HttpURLConnection conn = forwardRequest(router, path);
|
||||
try {
|
||||
conn.setInstanceFollowRedirects(false);
|
||||
conn.setDoOutput(true);
|
||||
conn.connect();
|
||||
|
||||
// Read the reply from the Namenode
|
||||
int responseCode = conn.getResponseCode();
|
||||
if (responseCode != HttpServletResponse.SC_TEMPORARY_REDIRECT) {
|
||||
LOG.info("We expected a redirection from the Namenode, not {}",
|
||||
responseCode);
|
||||
return null;
|
||||
}
|
||||
|
||||
// Extract the redirect location and return it
|
||||
String redirectLocation = conn.getHeaderField("Location");
|
||||
try {
|
||||
// We modify the namenode location and the path
|
||||
redirectLocation = redirectLocation
|
||||
.replaceAll("(?<=[?&;])namenoderpcaddress=.*?(?=[&;])",
|
||||
"namenoderpcaddress=" + router.getRouterId())
|
||||
.replaceAll("(?<=[/])webhdfs/v1/.*?(?=[?])",
|
||||
"webhdfs/v1" + path);
|
||||
return new URI(redirectLocation);
|
||||
} catch (URISyntaxException e) {
|
||||
LOG.error("Cannot parse redirect location {}", redirectLocation);
|
||||
}
|
||||
} finally {
|
||||
if (conn != null) {
|
||||
conn.disconnect();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Forwards a request to a subcluster.
|
||||
* @param router Router to check.
|
||||
* @param path Path in HDFS.
|
||||
* @return Reply from the subcluster.
|
||||
* @throws IOException
|
||||
*/
|
||||
private HttpURLConnection forwardRequest(
|
||||
final Router router, final String path) throws IOException {
|
||||
final Configuration conf =
|
||||
(Configuration)getContext().getAttribute(JspHelper.CURRENT_CONF);
|
||||
URLConnectionFactory connectionFactory =
|
||||
URLConnectionFactory.newDefaultURLConnectionFactory(conf);
|
||||
|
||||
// Find the namespace responsible for a path
|
||||
final RouterRpcServer rpcServer = getRPCServer(router);
|
||||
RemoteLocation createLoc = rpcServer.getCreateLocation(path);
|
||||
String nsId = createLoc.getNameserviceId();
|
||||
String dest = createLoc.getDest();
|
||||
ActiveNamenodeResolver nnResolver = router.getNamenodeResolver();
|
||||
List<? extends FederationNamenodeContext> namenodes =
|
||||
nnResolver.getNamenodesForNameserviceId(nsId);
|
||||
|
||||
// Go over the namenodes responsible for that namespace
|
||||
for (FederationNamenodeContext namenode : namenodes) {
|
||||
try {
|
||||
// Generate the request for the namenode
|
||||
String nnWebAddress = namenode.getWebAddress();
|
||||
String[] nnWebAddressSplit = nnWebAddress.split(":");
|
||||
String host = nnWebAddressSplit[0];
|
||||
int port = Integer.parseInt(nnWebAddressSplit[1]);
|
||||
|
||||
// Avoid double-encoding here
|
||||
query = URLDecoder.decode(query, "UTF-8");
|
||||
URI uri = new URI(getScheme(), null, host, port,
|
||||
reqPath + dest, query, null);
|
||||
URL url = uri.toURL();
|
||||
|
||||
// Send a request to the proper Namenode
|
||||
final HttpURLConnection conn =
|
||||
(HttpURLConnection)connectionFactory.openConnection(url);
|
||||
conn.setRequestMethod(method);
|
||||
|
||||
return conn;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Cannot redirect request to {}", namenode, e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a URI to redirect an operation to.
|
||||
* @param router Router to check.
|
||||
* @param ugi User group information.
|
||||
* @param delegation Delegation token.
|
||||
* @param username User name.
|
||||
* @param doAsUser Do as user.
|
||||
* @param path Path to check.
|
||||
* @param op Operation to perform.
|
||||
* @param openOffset Offset for opening a file.
|
||||
* @param excludeDatanodes Blocks to excluded.
|
||||
* @param parameters Other parameters.
|
||||
* @return Redirection URI.
|
||||
* @throws URISyntaxException If it cannot parse the URI.
|
||||
* @throws IOException If it cannot create the URI.
|
||||
*/
|
||||
private URI redirectURI(final Router router, final UserGroupInformation ugi,
|
||||
final DelegationParam delegation, final UserParam username,
|
||||
final DoAsParam doAsUser, final String path, final HttpOpParam.Op op,
|
||||
final long openOffset, final String excludeDatanodes,
|
||||
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
|
||||
final DatanodeInfo dn =
|
||||
chooseDatanode(router, path, op, openOffset, excludeDatanodes);
|
||||
|
||||
if (dn == null) {
|
||||
throw new IOException("Failed to find datanode, suggest to check cluster"
|
||||
+ " health. excludeDatanodes=" + excludeDatanodes);
|
||||
}
|
||||
|
||||
final String delegationQuery;
|
||||
if (!UserGroupInformation.isSecurityEnabled()) {
|
||||
// security disabled
|
||||
delegationQuery = Param.toSortedString("&", doAsUser, username);
|
||||
} else if (delegation.getValue() != null) {
|
||||
// client has provided a token
|
||||
delegationQuery = "&" + delegation;
|
||||
} else {
|
||||
// generate a token
|
||||
final Token<? extends TokenIdentifier> t = generateDelegationToken(
|
||||
router, ugi, request.getUserPrincipal().getName());
|
||||
delegationQuery = "&delegation=" + t.encodeToUrlString();
|
||||
}
|
||||
|
||||
final String redirectQuery = op.toQueryString() + delegationQuery
|
||||
+ "&namenoderpcaddress=" + router.getRouterId()
|
||||
+ Param.toSortedString("&", parameters);
|
||||
final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;
|
||||
|
||||
int port = "http".equals(getScheme()) ? dn.getInfoPort() :
|
||||
dn.getInfoSecurePort();
|
||||
final URI uri = new URI(getScheme(), null, dn.getHostName(), port, uripath,
|
||||
redirectQuery, null);
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("redirectURI={}", uri);
|
||||
}
|
||||
return uri;
|
||||
}
|
||||
|
||||
private DatanodeInfo chooseDatanode(final Router router,
|
||||
final String path, final HttpOpParam.Op op, final long openOffset,
|
||||
final String excludeDatanodes) throws IOException {
|
||||
// We need to get the DNs as a privileged user
|
||||
final RouterRpcServer rpcServer = getRPCServer(router);
|
||||
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
|
||||
|
||||
DatanodeInfo[] dns = loginUser.doAs(
|
||||
new PrivilegedAction<DatanodeInfo[]>() {
|
||||
@Override
|
||||
public DatanodeInfo[] run() {
|
||||
try {
|
||||
return rpcServer.getDatanodeReport(DatanodeReportType.LIVE);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot get the datanodes from the RPC server", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
HashSet<Node> excludes = new HashSet<Node>();
|
||||
if (excludeDatanodes != null) {
|
||||
Collection<String> collection =
|
||||
getTrimmedStringCollection(excludeDatanodes);
|
||||
for (DatanodeInfo dn : dns) {
|
||||
if (collection.contains(dn.getName())) {
|
||||
excludes.add(dn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (op == GetOpParam.Op.OPEN ||
|
||||
op == PostOpParam.Op.APPEND ||
|
||||
op == GetOpParam.Op.GETFILECHECKSUM) {
|
||||
// Choose a datanode containing a replica
|
||||
final ClientProtocol cp = getRpcClientProtocol();
|
||||
final HdfsFileStatus status = cp.getFileInfo(path);
|
||||
if (status == null) {
|
||||
throw new FileNotFoundException("File " + path + " not found.");
|
||||
}
|
||||
final long len = status.getLen();
|
||||
if (op == GetOpParam.Op.OPEN) {
|
||||
if (openOffset < 0L || (openOffset >= len && len > 0)) {
|
||||
throw new IOException("Offset=" + openOffset
|
||||
+ " out of the range [0, " + len + "); " + op + ", path=" + path);
|
||||
}
|
||||
}
|
||||
|
||||
if (len > 0) {
|
||||
final long offset = op == GetOpParam.Op.OPEN ? openOffset : len - 1;
|
||||
final LocatedBlocks locations = cp.getBlockLocations(path, offset, 1);
|
||||
final int count = locations.locatedBlockCount();
|
||||
if (count > 0) {
|
||||
LocatedBlock location0 = locations.get(0);
|
||||
return bestNode(location0.getLocations(), excludes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return getRandomDatanode(dns, excludes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a random Datanode from a subcluster.
|
||||
* @param dns Nodes to be chosen from.
|
||||
* @param excludes Nodes to be excluded from.
|
||||
* @return Random datanode from a particular subluster.
|
||||
*/
|
||||
private static DatanodeInfo getRandomDatanode(
|
||||
final DatanodeInfo[] dns, final HashSet<Node> excludes) {
|
||||
DatanodeInfo dn = null;
|
||||
|
||||
if (dns == null) {
|
||||
return dn;
|
||||
}
|
||||
|
||||
int numDNs = dns.length;
|
||||
int availableNodes = 0;
|
||||
if (excludes.isEmpty()) {
|
||||
availableNodes = numDNs;
|
||||
} else {
|
||||
for (DatanodeInfo di : dns) {
|
||||
if (!excludes.contains(di)) {
|
||||
availableNodes++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Return a random one from the list
|
||||
if (availableNodes > 0) {
|
||||
while (dn == null || excludes.contains(dn)) {
|
||||
Random rnd = new Random();
|
||||
int idx = rnd.nextInt(numDNs);
|
||||
dn = dns[idx];
|
||||
}
|
||||
}
|
||||
return dn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate the delegation tokens for this request.
|
||||
* @param router Router.
|
||||
* @param ugi User group information.
|
||||
* @param renewer Who is asking for the renewal.
|
||||
* @return The delegation tokens.
|
||||
* @throws IOException If it cannot create the tokens.
|
||||
*/
|
||||
private Token<? extends TokenIdentifier> generateDelegationToken(
|
||||
final Router router, final UserGroupInformation ugi,
|
||||
final String renewer) throws IOException {
|
||||
throw new UnsupportedOperationException("TODO Generate token for ugi=" +
|
||||
ugi + " request=" + request);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,129 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.contract.router.web;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.contract.hdfs.HDFSContract;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.junit.Assert;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
|
||||
|
||||
/**
|
||||
* The contract of Router-based Federated HDFS
|
||||
* This changes its feature set from platform for platform -the default
|
||||
* set is updated during initialization.
|
||||
*/
|
||||
public class RouterWebHDFSContract extends HDFSContract {
|
||||
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(WebHdfsFileSystem.class);
|
||||
|
||||
public static final String CONTRACT_WEBHDFS_XML = "contract/webhdfs.xml";
|
||||
private static MiniRouterDFSCluster cluster;
|
||||
|
||||
public RouterWebHDFSContract(Configuration conf) {
|
||||
super(conf);
|
||||
addConfResource(CONTRACT_WEBHDFS_XML);
|
||||
}
|
||||
|
||||
public static void createCluster() throws IOException {
|
||||
try {
|
||||
HdfsConfiguration conf = new HdfsConfiguration();
|
||||
conf.addResource(CONTRACT_HDFS_XML);
|
||||
conf.addResource(CONTRACT_WEBHDFS_XML);
|
||||
|
||||
cluster = new MiniRouterDFSCluster(true, 2);
|
||||
|
||||
// Start NNs and DNs and wait until ready
|
||||
cluster.startCluster();
|
||||
|
||||
// Start routers with only an RPC service
|
||||
cluster.startRouters();
|
||||
|
||||
// Register and verify all NNs with all routers
|
||||
cluster.registerNamenodes();
|
||||
cluster.waitNamenodeRegistration();
|
||||
|
||||
// Setup the mount table
|
||||
cluster.installMockLocations();
|
||||
|
||||
// Making one Namenodes active per nameservice
|
||||
if (cluster.isHighAvailability()) {
|
||||
for (String ns : cluster.getNameservices()) {
|
||||
cluster.switchToActive(ns, NAMENODES[0]);
|
||||
cluster.switchToStandby(ns, NAMENODES[1]);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
cluster = null;
|
||||
throw new IOException("Cannot start federated cluster", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void destroyCluster() {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
}
|
||||
|
||||
public static MiniDFSCluster getCluster() {
|
||||
return cluster.getCluster();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileSystem getTestFileSystem() throws IOException {
|
||||
return getFileSystem();
|
||||
}
|
||||
|
||||
public static FileSystem getFileSystem() throws IOException {
|
||||
//assumes cluster is not null
|
||||
Assert.assertNotNull("cluster not created", cluster);
|
||||
|
||||
// Create a connection to WebHDFS
|
||||
try {
|
||||
RouterContext router = cluster.getRandomRouter();
|
||||
String uriStr =
|
||||
WebHdfsConstants.WEBHDFS_SCHEME + "://" + router.getHttpAddress();
|
||||
URI uri = new URI(uriStr);
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
return FileSystem.get(uri, conf);
|
||||
} catch (URISyntaxException e) {
|
||||
LOG.error("Cannot create URI for the WebHDFS filesystem", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return WebHdfsConstants.WEBHDFS_SCHEME;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License. See accompanying LICENSE file.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.router.web;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractAppendTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Test append operations on a Router WebHDFS FS.
|
||||
*/
|
||||
public class TestRouterWebHDFSContractAppend
|
||||
extends AbstractContractAppendTest {
|
||||
|
||||
@BeforeClass
|
||||
public static void createCluster() throws IOException {
|
||||
RouterWebHDFSContract.createCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownCluster() throws IOException {
|
||||
RouterWebHDFSContract.destroyCluster();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new RouterWebHDFSContract(conf);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.router.web;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractConcatTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Test concat operations on a Router WebHDFS FS.
|
||||
*/
|
||||
public class TestRouterWebHDFSContractConcat
|
||||
extends AbstractContractConcatTest {
|
||||
|
||||
@BeforeClass
|
||||
public static void createCluster() throws IOException {
|
||||
RouterWebHDFSContract.createCluster();
|
||||
// perform a simple operation on the cluster to verify it is up
|
||||
RouterWebHDFSContract.getFileSystem().getDefaultBlockSize(new Path("/"));
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownCluster() throws IOException {
|
||||
RouterWebHDFSContract.destroyCluster();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new RouterWebHDFSContract(conf);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.router.web;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Test create operations on a Router WebHDFS FS.
|
||||
*/
|
||||
public class TestRouterWebHDFSContractCreate
|
||||
extends AbstractContractCreateTest {
|
||||
|
||||
@BeforeClass
|
||||
public static void createCluster() throws IOException {
|
||||
RouterWebHDFSContract.createCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownCluster() throws IOException {
|
||||
RouterWebHDFSContract.destroyCluster();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new RouterWebHDFSContract(conf);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.router.web;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractDeleteTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Test delete operations on a Router WebHDFS FS.
|
||||
*/
|
||||
public class TestRouterWebHDFSContractDelete
|
||||
extends AbstractContractDeleteTest {
|
||||
|
||||
@BeforeClass
|
||||
public static void createCluster() throws IOException {
|
||||
RouterWebHDFSContract.createCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownCluster() throws IOException {
|
||||
RouterWebHDFSContract.destroyCluster();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new RouterWebHDFSContract(conf);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.router.web;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Test dir operations on a Router WebHDFS FS.
|
||||
*/
|
||||
public class TestRouterWebHDFSContractMkdir extends AbstractContractMkdirTest {
|
||||
|
||||
@BeforeClass
|
||||
public static void createCluster() throws IOException {
|
||||
RouterWebHDFSContract.createCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownCluster() throws IOException {
|
||||
RouterWebHDFSContract.destroyCluster();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new RouterWebHDFSContract(conf);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.router.web;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Test open operations on a Router WebHDFS FS.
|
||||
*/
|
||||
public class TestRouterWebHDFSContractOpen extends AbstractContractOpenTest {
|
||||
|
||||
@BeforeClass
|
||||
public static void createCluster() throws IOException {
|
||||
RouterWebHDFSContract.createCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownCluster() throws IOException {
|
||||
RouterWebHDFSContract.destroyCluster();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new RouterWebHDFSContract(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test
|
||||
public void testOpenReadDir() throws Throwable {
|
||||
// WebHDFS itself allows open read on directory, we may need to
|
||||
// fix this first before make this test work
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test
|
||||
public void testOpenReadDirWithChild() throws Throwable {
|
||||
// WebHDFS itself allows open read on directory, we may need to
|
||||
// fix this first before make this test work
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.router.web;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Test rename operations on a Router WebHDFS FS.
|
||||
*/
|
||||
public class TestRouterWebHDFSContractRename
|
||||
extends AbstractContractRenameTest {
|
||||
|
||||
@BeforeClass
|
||||
public static void createCluster() throws IOException {
|
||||
RouterWebHDFSContract.createCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownCluster() throws IOException {
|
||||
RouterWebHDFSContract.destroyCluster();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new RouterWebHDFSContract(conf);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.router.web;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Test dir operations on a Router WebHDFS FS.
|
||||
*/
|
||||
public class TestRouterWebHDFSContractRootDirectory extends
|
||||
AbstractContractRootDirectoryTest {
|
||||
|
||||
@BeforeClass
|
||||
public static void createCluster() throws IOException {
|
||||
RouterWebHDFSContract.createCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownCluster() throws IOException {
|
||||
RouterWebHDFSContract.destroyCluster();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new RouterWebHDFSContract(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testListEmptyRootDirectory() throws IOException {
|
||||
// It doesn't apply because we still have the mount points here
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testRmEmptyRootDirNonRecursive() throws IOException {
|
||||
// It doesn't apply because we still have the mount points here
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testRecursiveRootListing() throws IOException {
|
||||
// It doesn't apply because we still have the mount points here
|
||||
}
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.router.web;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Test seek operations on a Router WebHDFS FS.
|
||||
*/
|
||||
public class TestRouterWebHDFSContractSeek extends AbstractContractSeekTest {
|
||||
|
||||
@BeforeClass
|
||||
public static void createCluster() throws IOException {
|
||||
RouterWebHDFSContract.createCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownCluster() throws IOException {
|
||||
RouterWebHDFSContract.destroyCluster();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new RouterWebHDFSContract(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testNegativeSeek() throws Throwable {
|
||||
System.out.println("Not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testSeekReadClosedFile() throws Throwable {
|
||||
System.out.println("Not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testSeekPastEndOfFileThenReseekAndRead() throws Throwable {
|
||||
System.out.println("Not supported");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Test the WebHDFS contract.
|
||||
*/
|
||||
package org.apache.hadoop.fs.contract.router.web;
|
|
@ -181,6 +181,11 @@ public class MiniRouterDFSCluster {
|
|||
return this.fileContext;
|
||||
}
|
||||
|
||||
public String getHttpAddress() {
|
||||
InetSocketAddress httpAddress = router.getHttpServerAddress();
|
||||
return NetUtils.getHostPortString(httpAddress);
|
||||
}
|
||||
|
||||
public void initRouter() throws URISyntaxException {
|
||||
// Store the bound points for the router interfaces
|
||||
InetSocketAddress rpcAddress = router.getRpcServerAddress();
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one
|
||||
~ or more contributor license agreements. See the NOTICE file
|
||||
~ distributed with this work for additional information
|
||||
~ regarding copyright ownership. The ASF licenses this file
|
||||
~ to you under the Apache License, Version 2.0 (the
|
||||
~ "License"); you may not use this file except in compliance
|
||||
~ with the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<configuration>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-strict-exceptions</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
|
@ -76,7 +76,9 @@ public class NameNodeHttpServer {
|
|||
this.bindAddress = bindAddress;
|
||||
}
|
||||
|
||||
private void initWebHdfs(Configuration conf) throws IOException {
|
||||
public static void initWebHdfs(Configuration conf, String hostname,
|
||||
HttpServer2 httpServer2, String jerseyResourcePackage)
|
||||
throws IOException {
|
||||
// set user pattern based on configuration file
|
||||
UserParam.setUserPattern(conf.get(
|
||||
HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
|
||||
|
@ -92,8 +94,8 @@ public class NameNodeHttpServer {
|
|||
final String name = className;
|
||||
|
||||
final String pathSpec = WebHdfsFileSystem.PATH_PREFIX + "/*";
|
||||
Map<String, String> params = getAuthFilterParams(conf);
|
||||
HttpServer2.defineFilter(httpServer.getWebAppContext(), name, className,
|
||||
Map<String, String> params = getAuthFilterParams(conf, hostname);
|
||||
HttpServer2.defineFilter(httpServer2.getWebAppContext(), name, className,
|
||||
params, new String[] { pathSpec });
|
||||
HttpServer2.LOG.info("Added filter '" + name + "' (class=" + className
|
||||
+ ")");
|
||||
|
@ -104,13 +106,14 @@ public class NameNodeHttpServer {
|
|||
Map<String, String> restCsrfParams = RestCsrfPreventionFilter
|
||||
.getFilterParams(conf, "dfs.webhdfs.rest-csrf.");
|
||||
String restCsrfClassName = RestCsrfPreventionFilter.class.getName();
|
||||
HttpServer2.defineFilter(httpServer.getWebAppContext(), restCsrfClassName,
|
||||
restCsrfClassName, restCsrfParams, new String[] {pathSpec});
|
||||
HttpServer2.defineFilter(httpServer2.getWebAppContext(),
|
||||
restCsrfClassName, restCsrfClassName, restCsrfParams,
|
||||
new String[] {pathSpec});
|
||||
}
|
||||
|
||||
// add webhdfs packages
|
||||
httpServer.addJerseyResourcePackage(NamenodeWebHdfsMethods.class
|
||||
.getPackage().getName() + ";" + Param.class.getPackage().getName(),
|
||||
httpServer2.addJerseyResourcePackage(
|
||||
jerseyResourcePackage + ";" + Param.class.getPackage().getName(),
|
||||
pathSpec);
|
||||
}
|
||||
|
||||
|
@ -165,7 +168,8 @@ public class NameNodeHttpServer {
|
|||
datanodeSslPort.getPort());
|
||||
}
|
||||
|
||||
initWebHdfs(conf);
|
||||
initWebHdfs(conf, bindAddress.getHostName(), httpServer,
|
||||
NamenodeWebHdfsMethods.class.getPackage().getName());
|
||||
|
||||
httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn);
|
||||
httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
|
||||
|
@ -186,8 +190,8 @@ public class NameNodeHttpServer {
|
|||
}
|
||||
}
|
||||
|
||||
private Map<String, String> getAuthFilterParams(Configuration conf)
|
||||
throws IOException {
|
||||
private static Map<String, String> getAuthFilterParams(Configuration conf,
|
||||
String hostname) throws IOException {
|
||||
Map<String, String> params = new HashMap<String, String>();
|
||||
// Select configs beginning with 'dfs.web.authentication.'
|
||||
Iterator<Map.Entry<String, String>> iterator = conf.iterator();
|
||||
|
@ -203,8 +207,7 @@ public class NameNodeHttpServer {
|
|||
params
|
||||
.put(
|
||||
DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
|
||||
SecurityUtil.getServerPrincipal(principalInConf,
|
||||
bindAddress.getHostName()));
|
||||
SecurityUtil.getServerPrincipal(principalInConf, hostname));
|
||||
} else if (UserGroupInformation.isSecurityEnabled()) {
|
||||
HttpServer2.LOG.error(
|
||||
"WebHDFS and security are enabled, but configuration property '" +
|
||||
|
|
|
@ -141,7 +141,7 @@ public class NamenodeWebHdfsMethods {
|
|||
Boolean.valueOf(request.getHeader(WebHdfsFileSystem.EZ_HEADER));
|
||||
}
|
||||
|
||||
private void init(final UserGroupInformation ugi,
|
||||
protected void init(final UserGroupInformation ugi,
|
||||
final DelegationParam delegation,
|
||||
final UserParam username, final DoAsParam doAsUser,
|
||||
final UriFsPathParam path, final HttpOpParam<?> op,
|
||||
|
@ -182,6 +182,14 @@ public class NamenodeWebHdfsMethods {
|
|||
return cp;
|
||||
}
|
||||
|
||||
protected String getScheme() {
|
||||
return scheme;
|
||||
}
|
||||
|
||||
protected ServletContext getContext() {
|
||||
return context;
|
||||
}
|
||||
|
||||
private <T> T doAs(final UserGroupInformation ugi,
|
||||
final PrivilegedExceptionAction<T> action)
|
||||
throws IOException, InterruptedException {
|
||||
|
@ -204,7 +212,7 @@ public class NamenodeWebHdfsMethods {
|
|||
}
|
||||
@Override
|
||||
public String getHostAddress() {
|
||||
return remoteAddr;
|
||||
return getRemoteAddr();
|
||||
}
|
||||
@Override
|
||||
public InetAddress getHostInetAddress() {
|
||||
|
@ -215,8 +223,8 @@ public class NamenodeWebHdfsMethods {
|
|||
}
|
||||
}
|
||||
};
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
namenode.queueExternalCall(call);
|
||||
|
||||
queueExternalCall(call);
|
||||
T result = null;
|
||||
try {
|
||||
result = call.get();
|
||||
|
@ -233,6 +241,16 @@ public class NamenodeWebHdfsMethods {
|
|||
return result;
|
||||
}
|
||||
|
||||
protected String getRemoteAddr() {
|
||||
return remoteAddr;
|
||||
}
|
||||
|
||||
protected void queueExternalCall(ExternalCall call)
|
||||
throws IOException, InterruptedException {
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
namenode.queueExternalCall(call);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static DatanodeInfo chooseDatanode(final NameNode namenode,
|
||||
final String path, final HttpOpParam.Op op, final long openOffset,
|
||||
|
@ -304,7 +322,7 @@ public class NamenodeWebHdfsMethods {
|
|||
* sorted based on availability and network distances, thus it is sufficient
|
||||
* to return the first element of the node here.
|
||||
*/
|
||||
private static DatanodeInfo bestNode(DatanodeInfo[] nodes,
|
||||
protected static DatanodeInfo bestNode(DatanodeInfo[] nodes,
|
||||
HashSet<Node> excludes) throws IOException {
|
||||
for (DatanodeInfo dn: nodes) {
|
||||
if (false == dn.isDecommissioned() && false == excludes.contains(dn)) {
|
||||
|
@ -468,7 +486,7 @@ public class NamenodeWebHdfsMethods {
|
|||
|
||||
/** Validate all required params. */
|
||||
@SuppressWarnings("rawtypes")
|
||||
private void validateOpParams(HttpOpParam<?> op, Param... params) {
|
||||
protected void validateOpParams(HttpOpParam<?> op, Param... params) {
|
||||
for (Param param : params) {
|
||||
if (param.getValue() == null || param.getValueString() == null || param
|
||||
.getValueString().isEmpty()) {
|
||||
|
@ -568,7 +586,7 @@ public class NamenodeWebHdfsMethods {
|
|||
});
|
||||
}
|
||||
|
||||
private Response put(
|
||||
protected Response put(
|
||||
final UserGroupInformation ugi,
|
||||
final DelegationParam delegation,
|
||||
final UserParam username,
|
||||
|
@ -600,14 +618,13 @@ public class NamenodeWebHdfsMethods {
|
|||
final NoRedirectParam noredirectParam,
|
||||
final StoragePolicyParam policyName
|
||||
) throws IOException, URISyntaxException {
|
||||
|
||||
final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final ClientProtocol cp = getRpcClientProtocol();
|
||||
|
||||
switch(op.getValue()) {
|
||||
case CREATE:
|
||||
{
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final URI uri = redirectURI(null, namenode, ugi, delegation, username,
|
||||
doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf),
|
||||
exclDatanodes.getValue(), permission, unmaskedPermission,
|
||||
|
@ -838,7 +855,7 @@ public class NamenodeWebHdfsMethods {
|
|||
});
|
||||
}
|
||||
|
||||
private Response post(
|
||||
protected Response post(
|
||||
final UserGroupInformation ugi,
|
||||
final DelegationParam delegation,
|
||||
final UserParam username,
|
||||
|
@ -1010,7 +1027,7 @@ public class NamenodeWebHdfsMethods {
|
|||
return encodedValue;
|
||||
}
|
||||
|
||||
private Response get(
|
||||
protected Response get(
|
||||
final UserGroupInformation ugi,
|
||||
final DelegationParam delegation,
|
||||
final UserParam username,
|
||||
|
@ -1334,7 +1351,7 @@ public class NamenodeWebHdfsMethods {
|
|||
});
|
||||
}
|
||||
|
||||
private Response delete(
|
||||
protected Response delete(
|
||||
final UserGroupInformation ugi,
|
||||
final DelegationParam delegation,
|
||||
final UserParam username,
|
||||
|
|
Loading…
Reference in New Issue