diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java index 168fd77239d..45b64195707 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java @@ -108,7 +108,7 @@ public final class HttpServer2 implements FilterContainer { static final String FILTER_INITIALIZER_PROPERTY = "hadoop.http.filter.initializers"; - static final String HTTP_MAX_THREADS = "hadoop.http.max.threads"; + public static final String HTTP_MAX_THREADS = "hadoop.http.max.threads"; // The ServletContext attribute where the daemon Configuration // gets stored. diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 014bfdaaee3..dd2a2e0a9ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -362,6 +362,8 @@ Release 2.7.0 - UNRELEASED HDFS-7394. Log at INFO level, not WARN level, when InvalidToken is seen in ShortCircuitCache (Keith Pak via Colin P. McCabe) + HDFS-7279. Use netty to implement DatanodeWebHdfsMethods. (wheat9) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java index 4ed1b2bab06..637c679b9ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java @@ -58,8 +58,7 @@ public class JspHelper { /** Private constructor for preventing creating JspHelper object. */ private JspHelper() {} - private static String getDefaultWebUserName(Configuration conf - ) throws IOException { + public static String getDefaultWebUserName(Configuration conf) throws IOException { String user = conf.get( HADOOP_HTTP_STATIC_USER, DEFAULT_HADOOP_HTTP_STATIC_USER); if (user == null || user.length() == 0) { @@ -207,7 +206,7 @@ public class JspHelper { /** * Expected user name should be a short name. */ - private static void checkUsername(final String expected, final String name + public static void checkUsername(final String expected, final String name ) throws IOException { if (expected == null && name != null) { throw new IOException("Usernames not matched: expecting null but name=" diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index adfbaf38a93..a53698a1793 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -32,8 +32,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_K import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY; @@ -64,6 +62,7 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.net.URI; import java.net.UnknownHostException; +import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -152,15 +151,13 @@ import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResour import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; -import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods; +import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; -import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; -import org.apache.hadoop.hdfs.web.resources.Param; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.io.IOUtils; @@ -295,6 +292,7 @@ public class DataNode extends ReconfigurableBase private DataStorage storage = null; private HttpServer2 infoServer = null; + private DatanodeHttpServer httpServer = null; private int infoPort; private int infoSecurePort; @@ -632,64 +630,36 @@ public class DataNode extends ReconfigurableBase * for information related to the different configuration options and * Http Policy is decided. */ - private void startInfoServer(Configuration conf) throws IOException { - HttpServer2.Builder builder = new HttpServer2.Builder().setName("datanode") - .setConf(conf).setACL(new AccessControlList(conf.get(DFS_ADMIN, " "))); - - HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf); - - if (policy.isHttpEnabled()) { - if (secureResources == null) { - InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf); - int port = infoSocAddr.getPort(); - builder.addEndpoint(URI.create("http://" - + NetUtils.getHostPortString(infoSocAddr))); - if (port == 0) { - builder.setFindPort(true); - } - } else { - // The http socket is created externally using JSVC, we add it in - // directly. - builder.setConnector(secureResources.getListener()); - } - } - - if (policy.isHttpsEnabled()) { - InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get( - DFS_DATANODE_HTTPS_ADDRESS_KEY, DFS_DATANODE_HTTPS_ADDRESS_DEFAULT)); - - Configuration sslConf = DFSUtil.loadSslConfiguration(conf); - DFSUtil.loadSslConfToHttpServerBuilder(builder, sslConf); - - int port = secInfoSocAddr.getPort(); - if (port == 0) { - builder.setFindPort(true); - } - builder.addEndpoint(URI.create("https://" - + NetUtils.getHostPortString(secInfoSocAddr))); - } + private void startInfoServer(Configuration conf) + throws IOException { + Configuration confForInfoServer = new Configuration(conf); + confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10); + HttpServer2.Builder builder = new HttpServer2.Builder() + .setName("datanode") + .setConf(conf).setACL(new AccessControlList(conf.get(DFS_ADMIN, " "))) + .addEndpoint(URI.create("http://localhost:0")) + .setFindPort(true); this.infoServer = builder.build(); this.infoServer.setAttribute("datanode", this); this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf); - this.infoServer.addServlet(null, "/blockScannerReport", + this.infoServer.addServlet(null, "/blockScannerReport", DataBlockScanner.Servlet.class); - - if (WebHdfsFileSystem.isEnabled(conf, LOG)) { - infoServer.addJerseyResourcePackage(DatanodeWebHdfsMethods.class - .getPackage().getName() + ";" + Param.class.getPackage().getName(), - WebHdfsFileSystem.PATH_PREFIX + "/*"); - } this.infoServer.start(); + InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0); - int connIdx = 0; - if (policy.isHttpEnabled()) { - infoPort = infoServer.getConnectorAddress(connIdx++).getPort(); + // SecureDataNodeStarter will bind the privileged port to the channel if + // the DN is started by JSVC, pass it along. + ServerSocketChannel httpServerChannel = secureResources != null ? + secureResources.getHttpServerChannel() : null; + this.httpServer = new DatanodeHttpServer(conf, jettyAddr, httpServerChannel); + httpServer.start(); + if (httpServer.getHttpAddress() != null) { + infoPort = httpServer.getHttpAddress().getPort(); } - - if (policy.isHttpsEnabled()) { - infoSecurePort = infoServer.getConnectorAddress(connIdx).getPort(); + if (httpServer.getHttpsAddress() != null) { + infoSecurePort = httpServer.getHttpsAddress().getPort(); } } @@ -1651,6 +1621,12 @@ public class DataNode extends ReconfigurableBase LOG.warn("Exception shutting down DataNode", e); } } + try { + httpServer.close(); + } catch (Exception e) { + LOG.warn("Exception shutting down DataNode HttpServer", e); + } + if (pauseMonitor != null) { pauseMonitor.stop(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java index f0f83e4fc32..c0df244e2c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java @@ -16,10 +16,7 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.nio.channels.ServerSocketChannel; - +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.daemon.Daemon; import org.apache.commons.daemon.DaemonContext; import org.apache.hadoop.conf.Configuration; @@ -28,12 +25,12 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.http.HttpConfig; -import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; -import org.mortbay.jetty.Connector; -import com.google.common.annotations.VisibleForTesting; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.nio.channels.ServerSocketChannel; /** * Utility class to start a datanode in a secure cluster, first obtaining @@ -45,17 +42,17 @@ public class SecureDataNodeStarter implements Daemon { */ public static class SecureResources { private final ServerSocket streamingSocket; - private final Connector listener; - public SecureResources(ServerSocket streamingSocket, - Connector listener) { - + private final ServerSocketChannel httpServerSocket; + public SecureResources(ServerSocket streamingSocket, ServerSocketChannel httpServerSocket) { this.streamingSocket = streamingSocket; - this.listener = listener; + this.httpServerSocket = httpServerSocket; } public ServerSocket getStreamingSocket() { return streamingSocket; } - public Connector getListener() { return listener; } + public ServerSocketChannel getHttpServerChannel() { + return httpServerSocket; + } } private String [] args; @@ -121,29 +118,31 @@ public class SecureDataNodeStarter implements Daemon { // Bind a port for the web server. The code intends to bind HTTP server to // privileged port only, as the client can authenticate the server using // certificates if they are communicating through SSL. - Connector listener = null; + final ServerSocketChannel httpChannel; if (policy.isHttpEnabled()) { - listener = HttpServer2.createDefaultChannelConnector(); + httpChannel = ServerSocketChannel.open(); InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf); - listener.setHost(infoSocAddr.getHostName()); - listener.setPort(infoSocAddr.getPort()); - // Open listener here in order to bind to port as root - listener.open(); - if (listener.getPort() != infoSocAddr.getPort()) { + httpChannel.socket().bind(infoSocAddr); + InetSocketAddress localAddr = (InetSocketAddress) httpChannel.socket() + .getLocalSocketAddress(); + + if (localAddr.getPort() != infoSocAddr.getPort()) { throw new RuntimeException("Unable to bind on specified info port in secure " + "context. Needed " + streamingAddr.getPort() + ", got " + ss.getLocalPort()); } System.err.println("Successfully obtained privileged resources (streaming port = " - + ss + " ) (http listener port = " + listener.getConnection() +")"); + + ss + " ) (http listener port = " + localAddr.getPort() +")"); - if (listener.getPort() > 1023 && isSecure) { + if (localAddr.getPort() > 1023 && isSecure) { throw new RuntimeException( "Cannot start secure datanode with unprivileged HTTP ports"); } System.err.println("Opened info server at " + infoSocAddr); + } else { + httpChannel = null; } - return new SecureResources(ss, listener); + return new SecureResources(ss, httpChannel); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java new file mode 100644 index 00000000000..4ee82fb2631 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.web; + +import io.netty.bootstrap.ChannelFactory; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpRequestDecoder; +import io.netty.handler.codec.http.HttpResponseEncoder; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedWriteHandler; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.ssl.SSLFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.ServerSocketChannel; +import java.security.GeneralSecurityException; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; + +public class DatanodeHttpServer implements Closeable { + private final EventLoopGroup bossGroup; + private final EventLoopGroup workerGroup; + private final ServerSocketChannel externalHttpChannel; + private final ServerBootstrap httpServer; + private final SSLFactory sslFactory; + private final ServerBootstrap httpsServer; + private final Configuration conf; + private final Configuration confForCreate; + private InetSocketAddress httpAddress; + private InetSocketAddress httpsAddress; + + static final Log LOG = LogFactory.getLog(DatanodeHttpServer.class); + + public DatanodeHttpServer(final Configuration conf, final InetSocketAddress + jettyAddr, final ServerSocketChannel externalHttpChannel) + throws IOException { + this.conf = conf; + this.confForCreate = new Configuration(conf); + confForCreate.set(FsPermission.UMASK_LABEL, "000"); + + this.bossGroup = new NioEventLoopGroup(); + this.workerGroup = new NioEventLoopGroup(); + this.externalHttpChannel = externalHttpChannel; + HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf); + + if (policy.isHttpEnabled()) { + this.httpServer = new ServerBootstrap().group(bossGroup, workerGroup) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(new HttpRequestDecoder(), + new HttpResponseEncoder(), + new ChunkedWriteHandler(), + new URLDispatcher(jettyAddr, conf, confForCreate)); + } + }); + if (externalHttpChannel == null) { + httpServer.channel(NioServerSocketChannel.class); + } else { + httpServer.channelFactory(new ChannelFactory() { + @Override + public NioServerSocketChannel newChannel() { + return new NioServerSocketChannel(externalHttpChannel) { + // The channel has been bounded externally via JSVC, + // thus bind() becomes a no-op. + @Override + protected void doBind(SocketAddress localAddress) throws Exception {} + }; + } + }); + } + } else { + this.httpServer = null; + } + + if (policy.isHttpsEnabled()) { + this.sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf); + try { + sslFactory.init(); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + this.httpsServer = new ServerBootstrap().group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast( + new SslHandler(sslFactory.createSSLEngine()), + new HttpRequestDecoder(), + new HttpResponseEncoder(), + new ChunkedWriteHandler(), + new URLDispatcher(jettyAddr, conf, confForCreate)); + } + }); + } else { + this.httpsServer = null; + this.sslFactory = null; + } + } + + public InetSocketAddress getHttpAddress() { + return httpAddress; + } + + public InetSocketAddress getHttpsAddress() { + return httpsAddress; + } + + public void start() { + if (httpServer != null) { + ChannelFuture f = httpServer.bind(DataNode.getInfoAddr(conf)); + f.syncUninterruptibly(); + httpAddress = (InetSocketAddress) f.channel().localAddress(); + LOG.info("Listening HTTP traffic on " + httpAddress); + } + + if (httpsServer != null) { + InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get( + DFS_DATANODE_HTTPS_ADDRESS_KEY, DFS_DATANODE_HTTPS_ADDRESS_DEFAULT)); + ChannelFuture f = httpsServer.bind(secInfoSocAddr); + f.syncUninterruptibly(); + httpsAddress = (InetSocketAddress) f.channel().localAddress(); + LOG.info("Listening HTTPS traffic on " + httpsAddress); + } + } + + @Override + public void close() throws IOException { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + if (sslFactory != null) { + sslFactory.destroy(); + } + if (externalHttpChannel != null) { + externalHttpChannel.close(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/SimpleHttpProxyHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/SimpleHttpProxyHandler.java new file mode 100644 index 00000000000..9e02b4b7158 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/SimpleHttpProxyHandler.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.web; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpRequestEncoder; +import io.netty.handler.codec.http.HttpResponseEncoder; +import org.apache.commons.logging.Log; + +import java.net.InetSocketAddress; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Dead simple session-layer HTTP proxy. It gets the HTTP responses + * inside the context, assuming that the remote peer is reasonable fast and + * the response is small. The upper layer should be filtering out malicious + * inputs. + */ +class SimpleHttpProxyHandler extends SimpleChannelInboundHandler { + private String uri; + private Channel proxiedChannel; + private final InetSocketAddress host; + static final Log LOG = DatanodeHttpServer.LOG; + + SimpleHttpProxyHandler(InetSocketAddress host) { + this.host = host; + } + + private static class Forwarder extends ChannelInboundHandlerAdapter { + private final String uri; + private final Channel client; + + private Forwarder(String uri, Channel client) { + this.uri = uri; + this.client = client; + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + closeOnFlush(client); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) { + client.writeAndFlush(msg).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + if (future.isSuccess()) { + ctx.channel().read(); + } else { + LOG.debug("Proxy failed. Cause: ", future.cause()); + future.channel().close(); + } + } + }); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + LOG.debug("Proxy for " + uri + " failed. cause: ", cause); + closeOnFlush(ctx.channel()); + } + } + + @Override + public void channelRead0 + (final ChannelHandlerContext ctx, final HttpRequest req) { + uri = req.getUri(); + final Channel client = ctx.channel(); + Bootstrap proxiedServer = new Bootstrap() + .group(client.eventLoop()) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(new HttpRequestEncoder(), new Forwarder(uri, client)); + } + }); + ChannelFuture f = proxiedServer.connect(host); + proxiedChannel = f.channel(); + f.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + ctx.channel().pipeline().remove(HttpResponseEncoder.class); + HttpRequest newReq = new DefaultFullHttpRequest(HTTP_1_1, + req.getMethod(), req.getUri()); + newReq.headers().add(req.headers()); + newReq.headers().set(CONNECTION, CLOSE); + future.channel().writeAndFlush(newReq); + } else { + DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, + INTERNAL_SERVER_ERROR); + resp.headers().set(CONNECTION, CLOSE); + LOG.info("Proxy " + uri + " failed. Cause: ", future.cause()); + ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE); + client.close(); + } + } + }); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + if (proxiedChannel != null) { + proxiedChannel.close(); + proxiedChannel = null; + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + LOG.info("Proxy for " + uri + " failed. cause: ", cause); + if (proxiedChannel != null) { + proxiedChannel.close(); + proxiedChannel = null; + } + ctx.close(); + } + + private static void closeOnFlush(Channel ch) { + if (ch.isActive()) { + ch.writeAndFlush(Unpooled.EMPTY_BUFFER) + .addListener(ChannelFutureListener.CLOSE); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java new file mode 100644 index 00000000000..ff3f4684998 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.web; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.HttpRequest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX; + +class URLDispatcher extends SimpleChannelInboundHandler { + private final InetSocketAddress proxyHost; + private final Configuration conf; + private final Configuration confForCreate; + + URLDispatcher(InetSocketAddress proxyHost, Configuration conf, + Configuration confForCreate) { + this.proxyHost = proxyHost; + this.conf = conf; + this.confForCreate = confForCreate; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req) + throws Exception { + String uri = req.getUri(); + ChannelPipeline p = ctx.pipeline(); + if (uri.startsWith(WEBHDFS_PREFIX)) { + WebHdfsHandler h = new WebHdfsHandler(conf, confForCreate); + p.replace(this, WebHdfsHandler.class.getSimpleName(), h); + h.channelRead0(ctx, req); + } else { + SimpleHttpProxyHandler h = new SimpleHttpProxyHandler(proxyHost); + p.replace(this, SimpleHttpProxyHandler.class.getSimpleName(), h); + h.channelRead0(ctx, req); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java index 9598c38524f..1596f3dfd54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java @@ -17,19 +17,18 @@ */ package org.apache.hadoop.hdfs.server.datanode.web.resources; -import java.io.IOException; -import java.io.OutputStream; -import java.lang.annotation.Annotation; -import java.lang.reflect.Type; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.io.IOUtils; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.ext.MessageBodyWriter; import javax.ws.rs.ext.Provider; - -import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.hdfs.client.HdfsDataInputStream; -import org.apache.hadoop.io.IOUtils; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; /** * A response entity for a HdfsDataInputStream. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java new file mode 100644 index 00000000000..ea1c29f5fb0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java @@ -0,0 +1,70 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ +package org.apache.hadoop.hdfs.server.datanode.web.webhdfs; + +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.server.common.JspHelper; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; + +/** + * Create UGI from the request for the WebHDFS requests for the DNs. Note that + * the DN does not authenticate the UGI -- the NN will authenticate them in + * subsequent operations. + */ +class DataNodeUGIProvider { + private final ParameterParser params; + + DataNodeUGIProvider(ParameterParser params) { + this.params = params; + } + + UserGroupInformation ugi() throws IOException { + if (UserGroupInformation.isSecurityEnabled()) { + return tokenUGI(); + } + + final String usernameFromQuery = params.userName(); + final String doAsUserFromQuery = params.doAsUser(); + final String remoteUser = usernameFromQuery == null + ? JspHelper.getDefaultWebUserName(params.conf()) // not specified in + // request + : usernameFromQuery; + + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser); + JspHelper.checkUsername(ugi.getShortUserName(), usernameFromQuery); + if (doAsUserFromQuery != null) { + // create and attempt to authorize a proxy user + ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi); + } + return ugi; + } + + private UserGroupInformation tokenUGI() throws IOException { + Token token = params.delegationToken(); + ByteArrayInputStream buf = + new ByteArrayInputStream(token.getIdentifier()); + DataInputStream in = new DataInputStream(buf); + DelegationTokenIdentifier id = new DelegationTokenIdentifier(); + id.readFields(in); + UserGroupInformation ugi = id.getUser(); + ugi.addToken(token); + return ugi; + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/ExceptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/ExceptionHandler.java new file mode 100644 index 00000000000..fea40d7ce96 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/ExceptionHandler.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.web.webhdfs; + +import com.sun.jersey.api.ParamException; +import com.sun.jersey.api.container.ContainerException; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.commons.logging.Log; +import org.apache.hadoop.hdfs.web.JsonUtil; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.security.authorize.AuthorizationException; +import org.apache.hadoop.security.token.SecretManager; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.APPLICATION_JSON; + +class ExceptionHandler { + static Log LOG = WebHdfsHandler.LOG; + + static DefaultFullHttpResponse exceptionCaught(Throwable cause) { + Exception e = cause instanceof Exception ? (Exception) cause : new Exception(cause); + + if (LOG.isTraceEnabled()) { + LOG.trace("GOT EXCEPITION", e); + } + + //Convert exception + if (e instanceof ParamException) { + final ParamException paramexception = (ParamException)e; + e = new IllegalArgumentException("Invalid value for webhdfs parameter \"" + + paramexception.getParameterName() + "\": " + + e.getCause().getMessage(), e); + } else if (e instanceof ContainerException || e instanceof SecurityException) { + e = toCause(e); + } else if (e instanceof RemoteException) { + e = ((RemoteException)e).unwrapRemoteException(); + } + + //Map response status + final HttpResponseStatus s; + if (e instanceof SecurityException) { + s = FORBIDDEN; + } else if (e instanceof AuthorizationException) { + s = FORBIDDEN; + } else if (e instanceof FileNotFoundException) { + s = NOT_FOUND; + } else if (e instanceof IOException) { + s = FORBIDDEN; + } else if (e instanceof UnsupportedOperationException) { + s = BAD_REQUEST; + } else if (e instanceof IllegalArgumentException) { + s = BAD_REQUEST; + } else { + LOG.warn("INTERNAL_SERVER_ERROR", e); + s = INTERNAL_SERVER_ERROR; + } + + final byte[] js = JsonUtil.toJsonString(e).getBytes(); + DefaultFullHttpResponse resp = + new DefaultFullHttpResponse(HTTP_1_1, s, Unpooled.wrappedBuffer(js)); + + resp.headers().set(CONTENT_TYPE, APPLICATION_JSON); + resp.headers().set(CONTENT_LENGTH, js.length); + return resp; + } + + private static Exception toCause(Exception e) { + final Throwable t = e.getCause(); + if (e instanceof SecurityException) { + // For the issue reported in HDFS-6475, if SecurityException's cause + // is InvalidToken, and the InvalidToken's cause is StandbyException, + // return StandbyException; Otherwise, leave the exception as is, + // since they are handled elsewhere. See HDFS-6588. + if (t != null && t instanceof SecretManager.InvalidToken) { + final Throwable t1 = t.getCause(); + if (t1 != null && t1 instanceof StandbyException) { + e = (StandbyException)t1; + } + } + } else { + if (t != null && t instanceof Exception) { + e = (Exception)t; + } + } + return e; + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/HdfsWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/HdfsWriter.java new file mode 100644 index 00000000000..0433ce65aed --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/HdfsWriter.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.web.webhdfs; + +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.LastHttpContent; +import org.apache.commons.logging.Log; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.io.IOUtils; + +import java.io.IOException; +import java.io.OutputStream; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE; + +class HdfsWriter extends SimpleChannelInboundHandler { + private final DFSClient client; + private final OutputStream out; + private final DefaultHttpResponse response; + private static final Log LOG = WebHdfsHandler.LOG; + + HdfsWriter(DFSClient client, OutputStream out, DefaultHttpResponse response) { + this.client = client; + this.out = out; + this.response = response; + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, HttpContent chunk) + throws IOException { + chunk.content().readBytes(out, chunk.content().readableBytes()); + if (chunk instanceof LastHttpContent) { + response.headers().set(CONNECTION, CLOSE); + ctx.write(response).addListener(ChannelFutureListener.CLOSE); + releaseDfsResources(); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + releaseDfsResources(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + releaseDfsResources(); + DefaultHttpResponse resp = ExceptionHandler.exceptionCaught(cause); + resp.headers().set(CONNECTION, CLOSE); + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + } + + private void releaseDfsResources() { + IOUtils.cleanup(LOG, out); + IOUtils.cleanup(LOG, client); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/ParameterParser.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/ParameterParser.java new file mode 100644 index 00000000000..e1930b057c4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/ParameterParser.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.web.webhdfs; + +import io.netty.handler.codec.http.QueryStringDecoder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.web.resources.BlockSizeParam; +import org.apache.hadoop.hdfs.web.resources.BufferSizeParam; +import org.apache.hadoop.hdfs.web.resources.DelegationParam; +import org.apache.hadoop.hdfs.web.resources.DoAsParam; +import org.apache.hadoop.hdfs.web.resources.HttpOpParam; +import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam; +import org.apache.hadoop.hdfs.web.resources.OffsetParam; +import org.apache.hadoop.hdfs.web.resources.OverwriteParam; +import org.apache.hadoop.hdfs.web.resources.PermissionParam; +import org.apache.hadoop.hdfs.web.resources.ReplicationParam; +import org.apache.hadoop.hdfs.web.resources.UserParam; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.token.Token; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HDFS_URI_SCHEME; +import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX_LENGTH; + +class ParameterParser { + private final Configuration conf; + private final String path; + private final Map> params; + + ParameterParser(QueryStringDecoder decoder, Configuration conf) { + this.path = decoder.path().substring(WEBHDFS_PREFIX_LENGTH); + this.params = decoder.parameters(); + this.conf = conf; + } + + String path() { return path; } + + String op() { + return param(HttpOpParam.NAME); + } + + long offset() { + return new OffsetParam(param(OffsetParam.NAME)).getValue(); + } + + String namenodeId() { + return new NamenodeAddressParam(param(NamenodeAddressParam.NAME)) + .getValue(); + } + + String doAsUser() { + return new DoAsParam(param(DoAsParam.NAME)).getValue(); + } + + String userName() { + return new UserParam(param(UserParam.NAME)).getValue(); + } + + int bufferSize() { + return new BufferSizeParam(param(BufferSizeParam.NAME)).getValue(conf); + } + + long blockSize() { + return new BlockSizeParam(param(BlockSizeParam.NAME)).getValue(conf); + } + + short replication() { + return new ReplicationParam(param(ReplicationParam.NAME)).getValue(conf); + } + + FsPermission permission() { + return new PermissionParam(param(PermissionParam.NAME)).getFsPermission(); + } + + boolean overwrite() { + return new OverwriteParam(param(OverwriteParam.NAME)).getValue(); + } + + Token delegationToken() throws IOException { + String delegation = param(DelegationParam.NAME); + final Token token = new + Token(); + token.decodeFromUrlString(delegation); + URI nnUri = URI.create(HDFS_URI_SCHEME + "://" + namenodeId()); + boolean isLogical = HAUtil.isLogicalUri(conf, nnUri); + if (isLogical) { + token.setService(HAUtil.buildTokenServiceForLogicalUri(nnUri, + HDFS_URI_SCHEME)); + } else { + token.setService(SecurityUtil.buildTokenService(nnUri)); + } + return token; + } + + Configuration conf() { + return conf; + } + + private String param(String key) { + List p = params.get(key); + return p == null ? null : p.get(0); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java new file mode 100644 index 00000000000..cf702182883 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.web.webhdfs; + +import com.google.common.base.Preconditions; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.QueryStringDecoder; +import io.netty.handler.stream.ChunkedStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.web.JsonUtil; +import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; +import org.apache.hadoop.hdfs.web.resources.GetOpParam; +import org.apache.hadoop.hdfs.web.resources.PostOpParam; +import org.apache.hadoop.hdfs.web.resources.PutOpParam; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.PrivilegedExceptionAction; +import java.util.EnumSet; + +import static io.netty.handler.codec.http.HttpHeaders.Names.ACCESS_CONTROL_ALLOW_METHODS; +import static io.netty.handler.codec.http.HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpHeaders.Names.LOCATION; +import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE; +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpMethod.POST; +import static io.netty.handler.codec.http.HttpMethod.PUT; +import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE; +import static io.netty.handler.codec.http.HttpResponseStatus.CREATED; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HDFS_URI_SCHEME; +import static org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier.HDFS_DELEGATION_KIND; + +public class WebHdfsHandler extends SimpleChannelInboundHandler { + static final Log LOG = LogFactory.getLog(WebHdfsHandler.class); + public static final String WEBHDFS_PREFIX = WebHdfsFileSystem.PATH_PREFIX; + public static final int WEBHDFS_PREFIX_LENGTH = WEBHDFS_PREFIX.length(); + public static final String APPLICATION_OCTET_STREAM = + "application/octet-stream"; + public static final String APPLICATION_JSON = "application/json"; + + private final Configuration conf; + private final Configuration confForCreate; + + private String path; + private ParameterParser params; + private UserGroupInformation ugi; + + public WebHdfsHandler(Configuration conf, Configuration confForCreate) + throws IOException { + this.conf = conf; + this.confForCreate = confForCreate; + } + + @Override + public void channelRead0(final ChannelHandlerContext ctx, + final HttpRequest req) throws Exception { + Preconditions.checkArgument(req.getUri().startsWith(WEBHDFS_PREFIX)); + QueryStringDecoder queryString = new QueryStringDecoder(req.getUri()); + params = new ParameterParser(queryString, conf); + DataNodeUGIProvider ugiProvider = new DataNodeUGIProvider(params); + ugi = ugiProvider.ugi(); + path = params.path(); + + injectToken(); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + handle(ctx, req); + return null; + } + }); + } + + public void handle(ChannelHandlerContext ctx, HttpRequest req) + throws IOException, URISyntaxException { + String op = params.op(); + HttpMethod method = req.getMethod(); + if (PutOpParam.Op.CREATE.name().equalsIgnoreCase(op) + && method == PUT) { + onCreate(ctx); + } else if (PostOpParam.Op.APPEND.name().equalsIgnoreCase(op) + && method == POST) { + onAppend(ctx); + } else if (GetOpParam.Op.OPEN.name().equalsIgnoreCase(op) + && method == GET) { + onOpen(ctx); + } else if(GetOpParam.Op.GETFILECHECKSUM.name().equalsIgnoreCase(op) + && method == GET) { + onGetFileChecksum(ctx); + } else { + throw new IllegalArgumentException("Invalid operation " + op); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + LOG.debug("Error ", cause); + DefaultHttpResponse resp = ExceptionHandler.exceptionCaught(cause); + resp.headers().set(CONNECTION, CLOSE); + ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE); + } + + private void onCreate(ChannelHandlerContext ctx) + throws IOException, URISyntaxException { + writeContinueHeader(ctx); + + final String nnId = params.namenodeId(); + final int bufferSize = params.bufferSize(); + final short replication = params.replication(); + final long blockSize = params.blockSize(); + final FsPermission permission = params.permission(); + + EnumSet flags = params.overwrite() ? + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) + : EnumSet.of(CreateFlag.CREATE); + + final DFSClient dfsClient = newDfsClient(nnId, confForCreate); + OutputStream out = dfsClient.createWrappedOutputStream(dfsClient.create( + path, permission, flags, replication, + blockSize, null, bufferSize, null), null); + DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, CREATED); + + final URI uri = new URI(HDFS_URI_SCHEME, nnId, path, null, null); + resp.headers().set(LOCATION, uri.toString()); + resp.headers().set(CONTENT_LENGTH, 0); + ctx.pipeline().replace(this, HdfsWriter.class.getSimpleName(), + new HdfsWriter(dfsClient, out, resp)); + } + + private void onAppend(ChannelHandlerContext ctx) throws IOException { + writeContinueHeader(ctx); + final String nnId = params.namenodeId(); + final int bufferSize = params.bufferSize(); + + DFSClient dfsClient = newDfsClient(nnId, conf); + OutputStream out = dfsClient.append(path, bufferSize, null, null); + DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, OK); + resp.headers().set(CONTENT_LENGTH, 0); + ctx.pipeline().replace(this, HdfsWriter.class.getSimpleName(), + new HdfsWriter(dfsClient, out, resp)); + } + + private void onOpen(ChannelHandlerContext ctx) throws IOException { + final String nnId = params.namenodeId(); + final int bufferSize = params.bufferSize(); + final long offset = params.offset(); + + DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + HttpHeaders headers = response.headers(); + // Allow the UI to access the file + headers.set(ACCESS_CONTROL_ALLOW_METHODS, GET); + headers.set(ACCESS_CONTROL_ALLOW_ORIGIN, "*"); + headers.set(CONTENT_TYPE, APPLICATION_OCTET_STREAM); + headers.set(CONNECTION, CLOSE); + + final DFSClient dfsclient = newDfsClient(nnId, conf); + HdfsDataInputStream in = dfsclient.createWrappedInputStream( + dfsclient.open(path, bufferSize, true)); + in.seek(offset); + + if (in.getVisibleLength() >= offset) { + headers.set(CONTENT_LENGTH, in.getVisibleLength() - offset); + } + + ctx.write(response); + ctx.writeAndFlush(new ChunkedStream(in) { + @Override + public void close() throws Exception { + super.close(); + dfsclient.close(); + } + }).addListener(ChannelFutureListener.CLOSE); + } + + private void onGetFileChecksum(ChannelHandlerContext ctx) throws IOException { + MD5MD5CRC32FileChecksum checksum = null; + final String nnId = params.namenodeId(); + DFSClient dfsclient = newDfsClient(nnId, conf); + try { + checksum = dfsclient.getFileChecksum(path, Long.MAX_VALUE); + dfsclient.close(); + dfsclient = null; + } finally { + IOUtils.cleanup(LOG, dfsclient); + } + final byte[] js = JsonUtil.toJsonString(checksum).getBytes(); + DefaultFullHttpResponse resp = + new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(js)); + + resp.headers().set(CONTENT_TYPE, APPLICATION_JSON); + resp.headers().set(CONTENT_LENGTH, js.length); + resp.headers().set(CONNECTION, CLOSE); + ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE); + } + + private static void writeContinueHeader(ChannelHandlerContext ctx) { + DefaultHttpResponse r = new DefaultFullHttpResponse(HTTP_1_1, CONTINUE, + Unpooled.EMPTY_BUFFER); + ctx.writeAndFlush(r); + } + + private static DFSClient newDfsClient + (String nnId, Configuration conf) throws IOException { + URI uri = URI.create(HDFS_URI_SCHEME + "://" + nnId); + return new DFSClient(uri, conf); + } + + private void injectToken() throws IOException { + if (UserGroupInformation.isSecurityEnabled()) { + Token token = params.delegationToken(); + token.setKind(HDFS_DELEGATION_KIND); + ugi.addToken(token); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java index 000ca62b982..aaa18c666a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java @@ -84,6 +84,7 @@ public class DatanodeRegistration extends DatanodeID + "(" + getIpAddr() + ", datanodeUuid=" + getDatanodeUuid() + ", infoPort=" + getInfoPort() + + ", infoSecurePort=" + getInfoSecurePort() + ", ipcPort=" + getIpcPort() + ", storageInfo=" + storageInfo + ")"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/IntegerParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/IntegerParam.java index 452552f713d..94a7f8e77c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/IntegerParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/IntegerParam.java @@ -72,7 +72,8 @@ abstract class IntegerParam extends Param { @Override Integer parse(final String str) { try{ - return NULL.equals(str)? null: Integer.parseInt(str, radix); + return NULL.equals(str) || str == null ? null : Integer.parseInt(str, + radix); } catch(NumberFormatException e) { throw new IllegalArgumentException("Failed to parse \"" + str + "\" as a radix-" + radix + " integer.", e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/LongParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/LongParam.java index afb881429a2..5f30094cc2e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/LongParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/LongParam.java @@ -71,7 +71,8 @@ abstract class LongParam extends Param { @Override Long parse(final String str) { try { - return NULL.equals(str)? null: Long.parseLong(str, radix); + return NULL.equals(str) || str == null ? null: Long.parseLong(str, + radix); } catch(NumberFormatException e) { throw new IllegalArgumentException("Failed to parse \"" + str + "\" as a radix-" + radix + " long integer.", e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ShortParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ShortParam.java index bb42223dd2f..43ebbf4a3d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ShortParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ShortParam.java @@ -72,7 +72,8 @@ abstract class ShortParam extends Param { @Override Short parse(final String str) { try { - return NULL.equals(str)? null: Short.parseShort(str, radix); + return NULL.equals(str) || str == null ? null : Short.parseShort(str, + radix); } catch(NumberFormatException e) { throw new IllegalArgumentException("Failed to parse \"" + str + "\" as a radix-" + radix + " short integer.", e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java index 46e433d6df8..027fda08c09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java @@ -308,8 +308,8 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest { // Open the file, but request length longer than actual file length by 1. HttpOpParam.Op op = GetOpParam.Op.OPEN; - URL url = webhdfs.toUrl(op, testFile, new LengthParam(Long.valueOf( - content.length() + 1))); + URL url = webhdfs.toUrl(op, testFile, new LengthParam((long) (content + .length() + 1))); HttpURLConnection conn = null; InputStream is = null; try {