diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHttpServer.java index 93a9cff20e1..21a98717271 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHttpServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHttpServer.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.server.common.JspHelper; +import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer; import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.service.AbstractService; @@ -86,6 +87,9 @@ public class RouterHttpServer extends AbstractService { this.httpServer = builder.build(); + NameNodeHttpServer.initWebHdfs(conf, httpAddress.getHostName(), httpServer, + RouterWebHdfsMethods.class.getPackage().getName()); + this.httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, this.router); this.httpServer.setAttribute(JspHelper.CURRENT_CONF, this.conf); setupServlets(this.httpServer, this.conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 65f8b747400..3c021da72fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -549,7 +549,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { * @return The remote location for this file. * @throws IOException If the file has no creation location. */ - private RemoteLocation getCreateLocation(final String src) + protected RemoteLocation getCreateLocation(final String src) throws IOException { final List locations = getLocationsForPath(src, true); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java new file mode 100644 index 00000000000..960fe0084ef --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java @@ -0,0 +1,651 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.util.StringUtils.getTrimmedStringCollection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.common.JspHelper; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.Path; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import com.sun.jersey.spi.container.ResourceFilters; +import org.apache.hadoop.hdfs.web.JsonUtil; +import org.apache.hadoop.hdfs.web.ParamFilter; +import org.apache.hadoop.hdfs.web.URLConnectionFactory; +import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; +import org.apache.hadoop.hdfs.web.resources.AccessTimeParam; +import org.apache.hadoop.hdfs.web.resources.AclPermissionParam; +import org.apache.hadoop.hdfs.web.resources.BlockSizeParam; +import org.apache.hadoop.hdfs.web.resources.BufferSizeParam; +import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam; +import org.apache.hadoop.hdfs.web.resources.CreateFlagParam; +import org.apache.hadoop.hdfs.web.resources.CreateParentParam; +import org.apache.hadoop.hdfs.web.resources.DelegationParam; +import org.apache.hadoop.hdfs.web.resources.DestinationParam; +import org.apache.hadoop.hdfs.web.resources.DoAsParam; +import org.apache.hadoop.hdfs.web.resources.ExcludeDatanodesParam; +import org.apache.hadoop.hdfs.web.resources.FsActionParam; +import org.apache.hadoop.hdfs.web.resources.GetOpParam; +import org.apache.hadoop.hdfs.web.resources.GroupParam; +import org.apache.hadoop.hdfs.web.resources.HttpOpParam; +import org.apache.hadoop.hdfs.web.resources.LengthParam; +import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam; +import org.apache.hadoop.hdfs.web.resources.NewLengthParam; +import org.apache.hadoop.hdfs.web.resources.NoRedirectParam; +import org.apache.hadoop.hdfs.web.resources.OffsetParam; +import org.apache.hadoop.hdfs.web.resources.OldSnapshotNameParam; +import org.apache.hadoop.hdfs.web.resources.OverwriteParam; +import org.apache.hadoop.hdfs.web.resources.OwnerParam; +import org.apache.hadoop.hdfs.web.resources.Param; +import org.apache.hadoop.hdfs.web.resources.PermissionParam; +import org.apache.hadoop.hdfs.web.resources.PostOpParam; +import org.apache.hadoop.hdfs.web.resources.PutOpParam; +import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam; +import org.apache.hadoop.hdfs.web.resources.RenewerParam; +import org.apache.hadoop.hdfs.web.resources.ReplicationParam; +import org.apache.hadoop.hdfs.web.resources.SnapshotNameParam; +import org.apache.hadoop.hdfs.web.resources.StartAfterParam; +import org.apache.hadoop.hdfs.web.resources.StoragePolicyParam; +import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam; +import org.apache.hadoop.hdfs.web.resources.TokenKindParam; +import org.apache.hadoop.hdfs.web.resources.TokenServiceParam; +import org.apache.hadoop.hdfs.web.resources.UriFsPathParam; +import org.apache.hadoop.hdfs.web.resources.UserParam; +import org.apache.hadoop.hdfs.web.resources.XAttrEncodingParam; +import org.apache.hadoop.hdfs.web.resources.XAttrNameParam; +import org.apache.hadoop.hdfs.web.resources.XAttrSetFlagParam; +import org.apache.hadoop.hdfs.web.resources.XAttrValueParam; +import org.apache.hadoop.ipc.ExternalCall; +import org.apache.hadoop.ipc.RetriableException; +import org.apache.hadoop.net.Node; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLDecoder; +import java.security.PrivilegedAction; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Random; + +/** + * WebHDFS Router implementation. This is an extension of + * {@link NamenodeWebHdfsMethods}, and tries to reuse as much as possible. + */ +@Path("") +@ResourceFilters(ParamFilter.class) +public class RouterWebHdfsMethods extends NamenodeWebHdfsMethods { + private static final Logger LOG = + LoggerFactory.getLogger(RouterWebHdfsMethods.class); + + private static final ThreadLocal REMOTE_ADDRESS = + new ThreadLocal(); + + private @Context HttpServletRequest request; + private String method; + private String query; + private String reqPath; + + public RouterWebHdfsMethods(@Context HttpServletRequest request) { + super(request); + this.method = request.getMethod(); + this.query = request.getQueryString(); + this.reqPath = request.getServletPath(); + REMOTE_ADDRESS.set(JspHelper.getRemoteAddr(request)); + } + + @Override + protected void init(final UserGroupInformation ugi, + final DelegationParam delegation, + final UserParam username, final DoAsParam doAsUser, + final UriFsPathParam path, final HttpOpParam op, + final Param... parameters) { + super.init(ugi, delegation, username, doAsUser, path, op, parameters); + + REMOTE_ADDRESS.set(JspHelper.getRemoteAddr(request)); + } + + @Override + protected ClientProtocol getRpcClientProtocol() throws IOException { + final Router router = getRouter(); + final RouterRpcServer routerRpcServer = router.getRpcServer(); + if (routerRpcServer == null) { + throw new RetriableException("Router is in startup mode"); + } + return routerRpcServer; + } + + private void reset() { + REMOTE_ADDRESS.set(null); + } + + @Override + protected String getRemoteAddr() { + return REMOTE_ADDRESS.get(); + } + + @Override + protected void queueExternalCall(ExternalCall call) + throws IOException, InterruptedException { + getRouter().getRpcServer().getServer().queueCall(call); + } + + private Router getRouter() { + return (Router)getContext().getAttribute("name.node"); + } + + private static RouterRpcServer getRPCServer(final Router router) + throws IOException { + final RouterRpcServer routerRpcServer = router.getRpcServer(); + if (routerRpcServer == null) { + throw new RetriableException("Router is in startup mode"); + } + return routerRpcServer; + } + + @Override + protected Response put( + final UserGroupInformation ugi, + final DelegationParam delegation, + final UserParam username, + final DoAsParam doAsUser, + final String fullpath, + final PutOpParam op, + final DestinationParam destination, + final OwnerParam owner, + final GroupParam group, + final PermissionParam permission, + final OverwriteParam overwrite, + final BufferSizeParam bufferSize, + final ReplicationParam replication, + final BlockSizeParam blockSize, + final ModificationTimeParam modificationTime, + final AccessTimeParam accessTime, + final RenameOptionSetParam renameOptions, + final CreateParentParam createParent, + final TokenArgumentParam delegationTokenArgument, + final AclPermissionParam aclPermission, + final XAttrNameParam xattrName, + final XAttrValueParam xattrValue, + final XAttrSetFlagParam xattrSetFlag, + final SnapshotNameParam snapshotName, + final OldSnapshotNameParam oldSnapshotName, + final ExcludeDatanodesParam exclDatanodes, + final CreateFlagParam createFlagParam, + final NoRedirectParam noredirectParam, + final StoragePolicyParam policyName + ) throws IOException, URISyntaxException { + + switch(op.getValue()) { + case CREATE: + { + final Router router = getRouter(); + final URI uri = redirectURI(router, fullpath); + if (!noredirectParam.getValue()) { + return Response.temporaryRedirect(uri) + .type(MediaType.APPLICATION_OCTET_STREAM).build(); + } else { + final String js = JsonUtil.toJsonString("Location", uri); + return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); + } + } + case MKDIRS: + case CREATESYMLINK: + case RENAME: + case SETREPLICATION: + case SETOWNER: + case SETPERMISSION: + case SETTIMES: + case RENEWDELEGATIONTOKEN: + case CANCELDELEGATIONTOKEN: + case MODIFYACLENTRIES: + case REMOVEACLENTRIES: + case REMOVEDEFAULTACL: + case REMOVEACL: + case SETACL: + case SETXATTR: + case REMOVEXATTR: + case ALLOWSNAPSHOT: + case CREATESNAPSHOT: + case RENAMESNAPSHOT: + case DISALLOWSNAPSHOT: + case SETSTORAGEPOLICY: + { + // Whitelist operations that can handled by NamenodeWebHdfsMethods + return super.put(ugi, delegation, username, doAsUser, fullpath, op, + destination, owner, group, permission, + overwrite, bufferSize, replication, blockSize, modificationTime, + accessTime, renameOptions, createParent, delegationTokenArgument, + aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName, + oldSnapshotName, exclDatanodes, createFlagParam, noredirectParam, + policyName); + } + default: + throw new UnsupportedOperationException(op + " is not supported"); + } + } + + @Override + protected Response post( + final UserGroupInformation ugi, + final DelegationParam delegation, + final UserParam username, + final DoAsParam doAsUser, + final String fullpath, + final PostOpParam op, + final ConcatSourcesParam concatSrcs, + final BufferSizeParam bufferSize, + final ExcludeDatanodesParam excludeDatanodes, + final NewLengthParam newLength, + final NoRedirectParam noRedirectParam + ) throws IOException, URISyntaxException { + switch(op.getValue()) { + case APPEND: + { + final Router router = getRouter(); + final URI uri = redirectURI(router, ugi, delegation, username, + doAsUser, fullpath, op.getValue(), -1L, + excludeDatanodes.getValue(), bufferSize); + if (!noRedirectParam.getValue()) { + return Response.temporaryRedirect(uri) + .type(MediaType.APPLICATION_OCTET_STREAM).build(); + } else { + final String js = JsonUtil.toJsonString("Location", uri); + return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); + } + } + case CONCAT: + case TRUNCATE: + case UNSETSTORAGEPOLICY: + { + return super.post(ugi, delegation, username, doAsUser, fullpath, op, + concatSrcs, bufferSize, excludeDatanodes, newLength, + noRedirectParam); + } + default: + throw new UnsupportedOperationException(op + " is not supported"); + } + } + + @Override + protected Response get( + final UserGroupInformation ugi, + final DelegationParam delegation, + final UserParam username, + final DoAsParam doAsUser, + final String fullpath, + final GetOpParam op, + final OffsetParam offset, + final LengthParam length, + final RenewerParam renewer, + final BufferSizeParam bufferSize, + final List xattrNames, + final XAttrEncodingParam xattrEncoding, + final ExcludeDatanodesParam excludeDatanodes, + final FsActionParam fsAction, + final TokenKindParam tokenKind, + final TokenServiceParam tokenService, + final NoRedirectParam noredirectParam, + final StartAfterParam startAfter + ) throws IOException, URISyntaxException { + try { + final Router router = getRouter(); + + switch (op.getValue()) { + case OPEN: + { + final URI uri = redirectURI(router, ugi, delegation, username, + doAsUser, fullpath, op.getValue(), offset.getValue(), + excludeDatanodes.getValue(), offset, length, bufferSize); + if (!noredirectParam.getValue()) { + return Response.temporaryRedirect(uri) + .type(MediaType.APPLICATION_OCTET_STREAM).build(); + } else { + final String js = JsonUtil.toJsonString("Location", uri); + return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); + } + } + case GETFILECHECKSUM: + { + final URI uri = redirectURI(router, ugi, delegation, username, + doAsUser, fullpath, op.getValue(), -1L, null); + if (!noredirectParam.getValue()) { + return Response.temporaryRedirect(uri) + .type(MediaType.APPLICATION_OCTET_STREAM).build(); + } else { + final String js = JsonUtil.toJsonString("Location", uri); + return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); + } + } + case GET_BLOCK_LOCATIONS: + case GETFILESTATUS: + case LISTSTATUS: + case GETCONTENTSUMMARY: + case GETHOMEDIRECTORY: + case GETACLSTATUS: + case GETXATTRS: + case LISTXATTRS: + case CHECKACCESS: + { + return super.get(ugi, delegation, username, doAsUser, fullpath, op, + offset, length, renewer, bufferSize, xattrNames, xattrEncoding, + excludeDatanodes, fsAction, tokenKind, tokenService, + noredirectParam, startAfter); + } + default: + throw new UnsupportedOperationException(op + " is not supported"); + } + } finally { + reset(); + } + } + + /** + * Get the redirect URI from the Namenode responsible for a path. + * @param router Router to check. + * @param path Path to get location for. + * @return URI returned by the Namenode. + * @throws IOException If it cannot get the redirect URI. + */ + private URI redirectURI(final Router router, final String path) + throws IOException { + // Forward the request to the proper Namenode + final HttpURLConnection conn = forwardRequest(router, path); + try { + conn.setInstanceFollowRedirects(false); + conn.setDoOutput(true); + conn.connect(); + + // Read the reply from the Namenode + int responseCode = conn.getResponseCode(); + if (responseCode != HttpServletResponse.SC_TEMPORARY_REDIRECT) { + LOG.info("We expected a redirection from the Namenode, not {}", + responseCode); + return null; + } + + // Extract the redirect location and return it + String redirectLocation = conn.getHeaderField("Location"); + try { + // We modify the namenode location and the path + redirectLocation = redirectLocation + .replaceAll("(?<=[?&;])namenoderpcaddress=.*?(?=[&;])", + "namenoderpcaddress=" + router.getRouterId()) + .replaceAll("(?<=[/])webhdfs/v1/.*?(?=[?])", + "webhdfs/v1" + path); + return new URI(redirectLocation); + } catch (URISyntaxException e) { + LOG.error("Cannot parse redirect location {}", redirectLocation); + } + } finally { + if (conn != null) { + conn.disconnect(); + } + } + return null; + } + + /** + * Forwards a request to a subcluster. + * @param router Router to check. + * @param path Path in HDFS. + * @return Reply from the subcluster. + * @throws IOException + */ + private HttpURLConnection forwardRequest( + final Router router, final String path) throws IOException { + final Configuration conf = + (Configuration)getContext().getAttribute(JspHelper.CURRENT_CONF); + URLConnectionFactory connectionFactory = + URLConnectionFactory.newDefaultURLConnectionFactory(conf); + + // Find the namespace responsible for a path + final RouterRpcServer rpcServer = getRPCServer(router); + RemoteLocation createLoc = rpcServer.getCreateLocation(path); + String nsId = createLoc.getNameserviceId(); + String dest = createLoc.getDest(); + ActiveNamenodeResolver nnResolver = router.getNamenodeResolver(); + List namenodes = + nnResolver.getNamenodesForNameserviceId(nsId); + + // Go over the namenodes responsible for that namespace + for (FederationNamenodeContext namenode : namenodes) { + try { + // Generate the request for the namenode + String nnWebAddress = namenode.getWebAddress(); + String[] nnWebAddressSplit = nnWebAddress.split(":"); + String host = nnWebAddressSplit[0]; + int port = Integer.parseInt(nnWebAddressSplit[1]); + + // Avoid double-encoding here + query = URLDecoder.decode(query, "UTF-8"); + URI uri = new URI(getScheme(), null, host, port, + reqPath + dest, query, null); + URL url = uri.toURL(); + + // Send a request to the proper Namenode + final HttpURLConnection conn = + (HttpURLConnection)connectionFactory.openConnection(url); + conn.setRequestMethod(method); + + return conn; + } catch (Exception e) { + LOG.error("Cannot redirect request to {}", namenode, e); + } + } + return null; + } + + /** + * Get a URI to redirect an operation to. + * @param router Router to check. + * @param ugi User group information. + * @param delegation Delegation token. + * @param username User name. + * @param doAsUser Do as user. + * @param path Path to check. + * @param op Operation to perform. + * @param openOffset Offset for opening a file. + * @param excludeDatanodes Blocks to excluded. + * @param parameters Other parameters. + * @return Redirection URI. + * @throws URISyntaxException If it cannot parse the URI. + * @throws IOException If it cannot create the URI. + */ + private URI redirectURI(final Router router, final UserGroupInformation ugi, + final DelegationParam delegation, final UserParam username, + final DoAsParam doAsUser, final String path, final HttpOpParam.Op op, + final long openOffset, final String excludeDatanodes, + final Param... parameters) throws URISyntaxException, IOException { + final DatanodeInfo dn = + chooseDatanode(router, path, op, openOffset, excludeDatanodes); + + if (dn == null) { + throw new IOException("Failed to find datanode, suggest to check cluster" + + " health. excludeDatanodes=" + excludeDatanodes); + } + + final String delegationQuery; + if (!UserGroupInformation.isSecurityEnabled()) { + // security disabled + delegationQuery = Param.toSortedString("&", doAsUser, username); + } else if (delegation.getValue() != null) { + // client has provided a token + delegationQuery = "&" + delegation; + } else { + // generate a token + final Token t = generateDelegationToken( + router, ugi, request.getUserPrincipal().getName()); + delegationQuery = "&delegation=" + t.encodeToUrlString(); + } + + final String redirectQuery = op.toQueryString() + delegationQuery + + "&namenoderpcaddress=" + router.getRouterId() + + Param.toSortedString("&", parameters); + final String uripath = WebHdfsFileSystem.PATH_PREFIX + path; + + int port = "http".equals(getScheme()) ? dn.getInfoPort() : + dn.getInfoSecurePort(); + final URI uri = new URI(getScheme(), null, dn.getHostName(), port, uripath, + redirectQuery, null); + + if (LOG.isTraceEnabled()) { + LOG.trace("redirectURI={}", uri); + } + return uri; + } + + private DatanodeInfo chooseDatanode(final Router router, + final String path, final HttpOpParam.Op op, final long openOffset, + final String excludeDatanodes) throws IOException { + // We need to get the DNs as a privileged user + final RouterRpcServer rpcServer = getRPCServer(router); + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + + DatanodeInfo[] dns = loginUser.doAs( + new PrivilegedAction() { + @Override + public DatanodeInfo[] run() { + try { + return rpcServer.getDatanodeReport(DatanodeReportType.LIVE); + } catch (IOException e) { + LOG.error("Cannot get the datanodes from the RPC server", e); + return null; + } + } + }); + + HashSet excludes = new HashSet(); + if (excludeDatanodes != null) { + Collection collection = + getTrimmedStringCollection(excludeDatanodes); + for (DatanodeInfo dn : dns) { + if (collection.contains(dn.getName())) { + excludes.add(dn); + } + } + } + + if (op == GetOpParam.Op.OPEN || + op == PostOpParam.Op.APPEND || + op == GetOpParam.Op.GETFILECHECKSUM) { + // Choose a datanode containing a replica + final ClientProtocol cp = getRpcClientProtocol(); + final HdfsFileStatus status = cp.getFileInfo(path); + if (status == null) { + throw new FileNotFoundException("File " + path + " not found."); + } + final long len = status.getLen(); + if (op == GetOpParam.Op.OPEN) { + if (openOffset < 0L || (openOffset >= len && len > 0)) { + throw new IOException("Offset=" + openOffset + + " out of the range [0, " + len + "); " + op + ", path=" + path); + } + } + + if (len > 0) { + final long offset = op == GetOpParam.Op.OPEN ? openOffset : len - 1; + final LocatedBlocks locations = cp.getBlockLocations(path, offset, 1); + final int count = locations.locatedBlockCount(); + if (count > 0) { + LocatedBlock location0 = locations.get(0); + return bestNode(location0.getLocations(), excludes); + } + } + } + + return getRandomDatanode(dns, excludes); + } + + /** + * Get a random Datanode from a subcluster. + * @param dns Nodes to be chosen from. + * @param excludes Nodes to be excluded from. + * @return Random datanode from a particular subluster. + */ + private static DatanodeInfo getRandomDatanode( + final DatanodeInfo[] dns, final HashSet excludes) { + DatanodeInfo dn = null; + + if (dns == null) { + return dn; + } + + int numDNs = dns.length; + int availableNodes = 0; + if (excludes.isEmpty()) { + availableNodes = numDNs; + } else { + for (DatanodeInfo di : dns) { + if (!excludes.contains(di)) { + availableNodes++; + } + } + } + + // Return a random one from the list + if (availableNodes > 0) { + while (dn == null || excludes.contains(dn)) { + Random rnd = new Random(); + int idx = rnd.nextInt(numDNs); + dn = dns[idx]; + } + } + return dn; + } + + /** + * Generate the delegation tokens for this request. + * @param router Router. + * @param ugi User group information. + * @param renewer Who is asking for the renewal. + * @return The delegation tokens. + * @throws IOException If it cannot create the tokens. + */ + private Token generateDelegationToken( + final Router router, final UserGroupInformation ugi, + final String renewer) throws IOException { + throw new UnsupportedOperationException("TODO Generate token for ugi=" + + ugi + " request=" + request); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/RouterWebHDFSContract.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/RouterWebHDFSContract.java new file mode 100644 index 00000000000..2f1be52925d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/RouterWebHDFSContract.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.contract.router.web; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.contract.hdfs.HDFSContract; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.web.WebHdfsConstants; +import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; + +/** + * The contract of Router-based Federated HDFS + * This changes its feature set from platform for platform -the default + * set is updated during initialization. + */ +public class RouterWebHDFSContract extends HDFSContract { + + public static final Logger LOG = + LoggerFactory.getLogger(WebHdfsFileSystem.class); + + public static final String CONTRACT_WEBHDFS_XML = "contract/webhdfs.xml"; + private static MiniRouterDFSCluster cluster; + + public RouterWebHDFSContract(Configuration conf) { + super(conf); + addConfResource(CONTRACT_WEBHDFS_XML); + } + + public static void createCluster() throws IOException { + try { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.addResource(CONTRACT_HDFS_XML); + conf.addResource(CONTRACT_WEBHDFS_XML); + + cluster = new MiniRouterDFSCluster(true, 2); + + // Start NNs and DNs and wait until ready + cluster.startCluster(); + + // Start routers with only an RPC service + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + + // Setup the mount table + cluster.installMockLocations(); + + // Making one Namenodes active per nameservice + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + cluster.switchToStandby(ns, NAMENODES[1]); + } + } + } catch (Exception e) { + cluster = null; + throw new IOException("Cannot start federated cluster", e); + } + } + + public static void destroyCluster() { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + public static MiniDFSCluster getCluster() { + return cluster.getCluster(); + } + + @Override + public FileSystem getTestFileSystem() throws IOException { + return getFileSystem(); + } + + public static FileSystem getFileSystem() throws IOException { + //assumes cluster is not null + Assert.assertNotNull("cluster not created", cluster); + + // Create a connection to WebHDFS + try { + RouterContext router = cluster.getRandomRouter(); + String uriStr = + WebHdfsConstants.WEBHDFS_SCHEME + "://" + router.getHttpAddress(); + URI uri = new URI(uriStr); + Configuration conf = new HdfsConfiguration(); + return FileSystem.get(uri, conf); + } catch (URISyntaxException e) { + LOG.error("Cannot create URI for the WebHDFS filesystem", e); + } + return null; + } + + @Override + public String getScheme() { + return WebHdfsConstants.WEBHDFS_SCHEME; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractAppend.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractAppend.java new file mode 100644 index 00000000000..40278c204b1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractAppend.java @@ -0,0 +1,45 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hadoop.fs.contract.router.web; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractAppendTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; + +/** + * Test append operations on a Router WebHDFS FS. + */ +public class TestRouterWebHDFSContractAppend + extends AbstractContractAppendTest { + + @BeforeClass + public static void createCluster() throws IOException { + RouterWebHDFSContract.createCluster(); + } + + @AfterClass + public static void teardownCluster() throws IOException { + RouterWebHDFSContract.destroyCluster(); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new RouterWebHDFSContract(conf); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractConcat.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractConcat.java new file mode 100644 index 00000000000..b82a8e10e6c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractConcat.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.router.web; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.AbstractContractConcatTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; + +/** + * Test concat operations on a Router WebHDFS FS. + */ +public class TestRouterWebHDFSContractConcat + extends AbstractContractConcatTest { + + @BeforeClass + public static void createCluster() throws IOException { + RouterWebHDFSContract.createCluster(); + // perform a simple operation on the cluster to verify it is up + RouterWebHDFSContract.getFileSystem().getDefaultBlockSize(new Path("/")); + } + + @AfterClass + public static void teardownCluster() throws IOException { + RouterWebHDFSContract.destroyCluster(); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new RouterWebHDFSContract(conf); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractCreate.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractCreate.java new file mode 100644 index 00000000000..ff1c610220f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractCreate.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.router.web; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractCreateTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; + +/** + * Test create operations on a Router WebHDFS FS. + */ +public class TestRouterWebHDFSContractCreate + extends AbstractContractCreateTest { + + @BeforeClass + public static void createCluster() throws IOException { + RouterWebHDFSContract.createCluster(); + } + + @AfterClass + public static void teardownCluster() throws IOException { + RouterWebHDFSContract.destroyCluster(); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new RouterWebHDFSContract(conf); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractDelete.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractDelete.java new file mode 100644 index 00000000000..dede65234b6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractDelete.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.router.web; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractDeleteTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; + +/** + * Test delete operations on a Router WebHDFS FS. + */ +public class TestRouterWebHDFSContractDelete + extends AbstractContractDeleteTest { + + @BeforeClass + public static void createCluster() throws IOException { + RouterWebHDFSContract.createCluster(); + } + + @AfterClass + public static void teardownCluster() throws IOException { + RouterWebHDFSContract.destroyCluster(); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new RouterWebHDFSContract(conf); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractMkdir.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractMkdir.java new file mode 100644 index 00000000000..9db4114be70 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractMkdir.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.router.web; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractMkdirTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; + +/** + * Test dir operations on a Router WebHDFS FS. + */ +public class TestRouterWebHDFSContractMkdir extends AbstractContractMkdirTest { + + @BeforeClass + public static void createCluster() throws IOException { + RouterWebHDFSContract.createCluster(); + } + + @AfterClass + public static void teardownCluster() throws IOException { + RouterWebHDFSContract.destroyCluster(); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new RouterWebHDFSContract(conf); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractOpen.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractOpen.java new file mode 100644 index 00000000000..f5517ddaaf1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractOpen.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.router.web; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractOpenTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +/** + * Test open operations on a Router WebHDFS FS. + */ +public class TestRouterWebHDFSContractOpen extends AbstractContractOpenTest { + + @BeforeClass + public static void createCluster() throws IOException { + RouterWebHDFSContract.createCluster(); + } + + @AfterClass + public static void teardownCluster() throws IOException { + RouterWebHDFSContract.destroyCluster(); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new RouterWebHDFSContract(conf); + } + + @Override + @Test + public void testOpenReadDir() throws Throwable { + // WebHDFS itself allows open read on directory, we may need to + // fix this first before make this test work + } + + @Override + @Test + public void testOpenReadDirWithChild() throws Throwable { + // WebHDFS itself allows open read on directory, we may need to + // fix this first before make this test work + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractRename.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractRename.java new file mode 100644 index 00000000000..a426ae0f002 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractRename.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.router.web; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractRenameTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; + +/** + * Test rename operations on a Router WebHDFS FS. + */ +public class TestRouterWebHDFSContractRename + extends AbstractContractRenameTest { + + @BeforeClass + public static void createCluster() throws IOException { + RouterWebHDFSContract.createCluster(); + } + + @AfterClass + public static void teardownCluster() throws IOException { + RouterWebHDFSContract.destroyCluster(); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new RouterWebHDFSContract(conf); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractRootDirectory.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractRootDirectory.java new file mode 100644 index 00000000000..8a027a40394 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractRootDirectory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.router.web; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; + +/** + * Test dir operations on a Router WebHDFS FS. + */ +public class TestRouterWebHDFSContractRootDirectory extends + AbstractContractRootDirectoryTest { + + @BeforeClass + public static void createCluster() throws IOException { + RouterWebHDFSContract.createCluster(); + } + + @AfterClass + public static void teardownCluster() throws IOException { + RouterWebHDFSContract.destroyCluster(); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new RouterWebHDFSContract(conf); + } + + @Override + public void testListEmptyRootDirectory() throws IOException { + // It doesn't apply because we still have the mount points here + } + + @Override + public void testRmEmptyRootDirNonRecursive() throws IOException { + // It doesn't apply because we still have the mount points here + } + + @Override + public void testSimpleRootListing() throws IOException{ + } + + @Override + public void testRecursiveRootListing() throws IOException { + // It doesn't apply because we still have the mount points here + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractSeek.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractSeek.java new file mode 100644 index 00000000000..5fbbc9b1e5a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/TestRouterWebHDFSContractSeek.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.router.web; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractSeekTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; + +/** + * Test seek operations on a Router WebHDFS FS. + */ +public class TestRouterWebHDFSContractSeek extends AbstractContractSeekTest { + + @BeforeClass + public static void createCluster() throws IOException { + RouterWebHDFSContract.createCluster(); + } + + @AfterClass + public static void teardownCluster() throws IOException { + RouterWebHDFSContract.destroyCluster(); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new RouterWebHDFSContract(conf); + } + + @Override + public void testNegativeSeek() throws Throwable { + System.out.println("Not supported"); + } + + @Override + public void testSeekReadClosedFile() throws Throwable { + System.out.println("Not supported"); + } + + @Override + public void testSeekPastEndOfFileThenReseekAndRead() throws Throwable { + System.out.println("Not supported"); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/package-info.java new file mode 100644 index 00000000000..3411cf082fb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/fs/contract/router/web/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Test the WebHDFS contract. + */ +package org.apache.hadoop.fs.contract.router.web; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java index df9f038156c..c49f90a497e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java @@ -181,6 +181,11 @@ public class MiniRouterDFSCluster { return this.fileContext; } + public String getHttpAddress() { + InetSocketAddress httpAddress = router.getHttpServerAddress(); + return NetUtils.getHostPortString(httpAddress); + } + public void initRouter() throws URISyntaxException { // Store the bound points for the router interfaces InetSocketAddress rpcAddress = router.getRpcServerAddress(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/resources/contract/webhdfs.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/resources/contract/webhdfs.xml new file mode 100644 index 00000000000..f9b7d9435d2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/resources/contract/webhdfs.xml @@ -0,0 +1,26 @@ + + + + + + fs.contract.supports-strict-exceptions + false + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java index 5dcafb2501a..84ca7e8c53d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java @@ -76,7 +76,9 @@ public class NameNodeHttpServer { this.bindAddress = bindAddress; } - private void initWebHdfs(Configuration conf) throws IOException { + public static void initWebHdfs(Configuration conf, String hostname, + HttpServer2 httpServer2, String jerseyResourcePackage) + throws IOException { if (WebHdfsFileSystem.isEnabled(conf)) { // set user pattern based on configuration file UserParam.setUserPattern(conf.get( @@ -93,8 +95,8 @@ public class NameNodeHttpServer { final String name = className; final String pathSpec = WebHdfsFileSystem.PATH_PREFIX + "/*"; - Map params = getAuthFilterParams(conf); - HttpServer2.defineFilter(httpServer.getWebAppContext(), name, className, + Map params = getAuthFilterParams(conf, hostname); + HttpServer2.defineFilter(httpServer2.getWebAppContext(), name, className, params, new String[] { pathSpec }); HttpServer2.LOG.info("Added filter '" + name + "' (class=" + className + ")"); @@ -105,14 +107,14 @@ public class NameNodeHttpServer { Map restCsrfParams = RestCsrfPreventionFilter .getFilterParams(conf, "dfs.webhdfs.rest-csrf."); String restCsrfClassName = RestCsrfPreventionFilter.class.getName(); - HttpServer2.defineFilter(httpServer.getWebAppContext(), + HttpServer2.defineFilter(httpServer2.getWebAppContext(), restCsrfClassName, restCsrfClassName, restCsrfParams, new String[] {pathSpec}); } // add webhdfs packages - httpServer.addJerseyResourcePackage(NamenodeWebHdfsMethods.class - .getPackage().getName() + ";" + Param.class.getPackage().getName(), + httpServer2.addJerseyResourcePackage( + jerseyResourcePackage+ ";" + Param.class.getPackage().getName(), pathSpec); } } @@ -168,7 +170,8 @@ public class NameNodeHttpServer { datanodeSslPort.getPort()); } - initWebHdfs(conf); + initWebHdfs(conf, bindAddress.getHostName(), httpServer, + NamenodeWebHdfsMethods.class.getPackage().getName()); httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn); httpServer.setAttribute(JspHelper.CURRENT_CONF, conf); @@ -189,8 +192,8 @@ public class NameNodeHttpServer { } } - private Map getAuthFilterParams(Configuration conf) - throws IOException { + private static Map getAuthFilterParams(Configuration conf, + String hostname) throws IOException { Map params = new HashMap(); // Select configs beginning with 'dfs.web.authentication.' Iterator> iterator = conf.iterator(); @@ -206,8 +209,7 @@ public class NameNodeHttpServer { params .put( DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, - SecurityUtil.getServerPrincipal(principalInConf, - bindAddress.getHostName())); + SecurityUtil.getServerPrincipal(principalInConf, hostname)); } else if (UserGroupInformation.isSecurityEnabled()) { HttpServer2.LOG.error( "WebHDFS and security are enabled, but configuration property '" + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java index 2be2edbd313..81402376307 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java @@ -137,7 +137,7 @@ public class NamenodeWebHdfsMethods { Boolean.valueOf(request.getHeader(WebHdfsFileSystem.EZ_HEADER)); } - private void init(final UserGroupInformation ugi, + protected void init(final UserGroupInformation ugi, final DelegationParam delegation, final UserParam username, final DoAsParam doAsUser, final UriFsPathParam path, final HttpOpParam op, @@ -178,6 +178,14 @@ public class NamenodeWebHdfsMethods { return cp; } + protected String getScheme() { + return scheme; + } + + protected ServletContext getContext() { + return context; + } + private T doAs(final UserGroupInformation ugi, final PrivilegedExceptionAction action) throws IOException, InterruptedException { @@ -200,7 +208,7 @@ public class NamenodeWebHdfsMethods { } @Override public String getHostAddress() { - return remoteAddr; + return getRemoteAddr(); } @Override public InetAddress getHostInetAddress() { @@ -211,8 +219,8 @@ public class NamenodeWebHdfsMethods { } } }; - final NameNode namenode = (NameNode)context.getAttribute("name.node"); - namenode.queueExternalCall(call); + + queueExternalCall(call); T result = null; try { result = call.get(); @@ -229,6 +237,16 @@ public class NamenodeWebHdfsMethods { return result; } + protected String getRemoteAddr() { + return remoteAddr; + } + + protected void queueExternalCall(ExternalCall call) + throws IOException, InterruptedException { + final NameNode namenode = (NameNode)context.getAttribute("name.node"); + namenode.queueExternalCall(call); + } + @VisibleForTesting static DatanodeInfo chooseDatanode(final NameNode namenode, final String path, final HttpOpParam.Op op, final long openOffset, @@ -300,7 +318,7 @@ public class NamenodeWebHdfsMethods { * sorted based on availability and network distances, thus it is sufficient * to return the first element of the node here. */ - private static DatanodeInfo bestNode(DatanodeInfo[] nodes, + protected static DatanodeInfo bestNode(DatanodeInfo[] nodes, HashSet excludes) throws IOException { for (DatanodeInfo dn: nodes) { if (false == dn.isDecommissioned() && false == excludes.contains(dn)) { @@ -542,7 +560,7 @@ public class NamenodeWebHdfsMethods { }); } - private Response put( + protected Response put( final UserGroupInformation ugi, final DelegationParam delegation, final UserParam username, @@ -573,14 +591,13 @@ public class NamenodeWebHdfsMethods { final NoRedirectParam noredirectParam, final StoragePolicyParam policyName ) throws IOException, URISyntaxException { - final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF); - final NameNode namenode = (NameNode)context.getAttribute("name.node"); final ClientProtocol cp = getRpcClientProtocol(); switch(op.getValue()) { case CREATE: { + final NameNode namenode = (NameNode)context.getAttribute("name.node"); final URI uri = redirectURI(null, namenode, ugi, delegation, username, doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf), exclDatanodes.getValue(), permission, overwrite, bufferSize, @@ -792,7 +809,7 @@ public class NamenodeWebHdfsMethods { }); } - private Response post( + protected Response post( final UserGroupInformation ugi, final DelegationParam delegation, final UserParam username, @@ -954,7 +971,7 @@ public class NamenodeWebHdfsMethods { return encodedValue; } - private Response get( + protected Response get( final UserGroupInformation ugi, final DelegationParam delegation, final UserParam username, @@ -1282,7 +1299,7 @@ public class NamenodeWebHdfsMethods { }); } - private Response delete( + protected Response delete( final UserGroupInformation ugi, final DelegationParam delegation, final UserParam username,