HDFS-7279. Use netty to implement DatanodeWebHdfsMethods. Contributed by Haohui Mai.
This commit is contained in:
parent
6783d17fcf
commit
bf8e4332cb
|
@ -108,7 +108,7 @@ public final class HttpServer2 implements FilterContainer {
|
||||||
|
|
||||||
static final String FILTER_INITIALIZER_PROPERTY
|
static final String FILTER_INITIALIZER_PROPERTY
|
||||||
= "hadoop.http.filter.initializers";
|
= "hadoop.http.filter.initializers";
|
||||||
static final String HTTP_MAX_THREADS = "hadoop.http.max.threads";
|
public static final String HTTP_MAX_THREADS = "hadoop.http.max.threads";
|
||||||
|
|
||||||
// The ServletContext attribute where the daemon Configuration
|
// The ServletContext attribute where the daemon Configuration
|
||||||
// gets stored.
|
// gets stored.
|
||||||
|
|
|
@ -362,6 +362,8 @@ Release 2.7.0 - UNRELEASED
|
||||||
HDFS-7394. Log at INFO level, not WARN level, when InvalidToken is seen in
|
HDFS-7394. Log at INFO level, not WARN level, when InvalidToken is seen in
|
||||||
ShortCircuitCache (Keith Pak via Colin P. McCabe)
|
ShortCircuitCache (Keith Pak via Colin P. McCabe)
|
||||||
|
|
||||||
|
HDFS-7279. Use netty to implement DatanodeWebHdfsMethods. (wheat9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -58,8 +58,7 @@ public class JspHelper {
|
||||||
/** Private constructor for preventing creating JspHelper object. */
|
/** Private constructor for preventing creating JspHelper object. */
|
||||||
private JspHelper() {}
|
private JspHelper() {}
|
||||||
|
|
||||||
private static String getDefaultWebUserName(Configuration conf
|
public static String getDefaultWebUserName(Configuration conf) throws IOException {
|
||||||
) throws IOException {
|
|
||||||
String user = conf.get(
|
String user = conf.get(
|
||||||
HADOOP_HTTP_STATIC_USER, DEFAULT_HADOOP_HTTP_STATIC_USER);
|
HADOOP_HTTP_STATIC_USER, DEFAULT_HADOOP_HTTP_STATIC_USER);
|
||||||
if (user == null || user.length() == 0) {
|
if (user == null || user.length() == 0) {
|
||||||
|
@ -207,7 +206,7 @@ public class JspHelper {
|
||||||
/**
|
/**
|
||||||
* Expected user name should be a short name.
|
* Expected user name should be a short name.
|
||||||
*/
|
*/
|
||||||
private static void checkUsername(final String expected, final String name
|
public static void checkUsername(final String expected, final String name
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
if (expected == null && name != null) {
|
if (expected == null && name != null) {
|
||||||
throw new IOException("Usernames not matched: expecting null but name="
|
throw new IOException("Usernames not matched: expecting null but name="
|
||||||
|
|
|
@ -32,8 +32,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_K
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
|
||||||
|
@ -64,6 +62,7 @@ import java.net.InetSocketAddress;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
|
import java.nio.channels.ServerSocketChannel;
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -152,15 +151,13 @@ import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResour
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
|
import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.Param;
|
|
||||||
import org.apache.hadoop.http.HttpConfig;
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
import org.apache.hadoop.http.HttpServer2;
|
import org.apache.hadoop.http.HttpServer2;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
@ -295,6 +292,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
private DataStorage storage = null;
|
private DataStorage storage = null;
|
||||||
|
|
||||||
private HttpServer2 infoServer = null;
|
private HttpServer2 infoServer = null;
|
||||||
|
private DatanodeHttpServer httpServer = null;
|
||||||
private int infoPort;
|
private int infoPort;
|
||||||
private int infoSecurePort;
|
private int infoSecurePort;
|
||||||
|
|
||||||
|
@ -632,42 +630,15 @@ public class DataNode extends ReconfigurableBase
|
||||||
* for information related to the different configuration options and
|
* for information related to the different configuration options and
|
||||||
* Http Policy is decided.
|
* Http Policy is decided.
|
||||||
*/
|
*/
|
||||||
private void startInfoServer(Configuration conf) throws IOException {
|
private void startInfoServer(Configuration conf)
|
||||||
HttpServer2.Builder builder = new HttpServer2.Builder().setName("datanode")
|
throws IOException {
|
||||||
.setConf(conf).setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")));
|
Configuration confForInfoServer = new Configuration(conf);
|
||||||
|
confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
|
||||||
HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
|
HttpServer2.Builder builder = new HttpServer2.Builder()
|
||||||
|
.setName("datanode")
|
||||||
if (policy.isHttpEnabled()) {
|
.setConf(conf).setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
|
||||||
if (secureResources == null) {
|
.addEndpoint(URI.create("http://localhost:0"))
|
||||||
InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
|
.setFindPort(true);
|
||||||
int port = infoSocAddr.getPort();
|
|
||||||
builder.addEndpoint(URI.create("http://"
|
|
||||||
+ NetUtils.getHostPortString(infoSocAddr)));
|
|
||||||
if (port == 0) {
|
|
||||||
builder.setFindPort(true);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// The http socket is created externally using JSVC, we add it in
|
|
||||||
// directly.
|
|
||||||
builder.setConnector(secureResources.getListener());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (policy.isHttpsEnabled()) {
|
|
||||||
InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
|
|
||||||
DFS_DATANODE_HTTPS_ADDRESS_KEY, DFS_DATANODE_HTTPS_ADDRESS_DEFAULT));
|
|
||||||
|
|
||||||
Configuration sslConf = DFSUtil.loadSslConfiguration(conf);
|
|
||||||
DFSUtil.loadSslConfToHttpServerBuilder(builder, sslConf);
|
|
||||||
|
|
||||||
int port = secInfoSocAddr.getPort();
|
|
||||||
if (port == 0) {
|
|
||||||
builder.setFindPort(true);
|
|
||||||
}
|
|
||||||
builder.addEndpoint(URI.create("https://"
|
|
||||||
+ NetUtils.getHostPortString(secInfoSocAddr)));
|
|
||||||
}
|
|
||||||
|
|
||||||
this.infoServer = builder.build();
|
this.infoServer = builder.build();
|
||||||
|
|
||||||
|
@ -675,21 +646,20 @@ public class DataNode extends ReconfigurableBase
|
||||||
this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
|
this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
|
||||||
this.infoServer.addServlet(null, "/blockScannerReport",
|
this.infoServer.addServlet(null, "/blockScannerReport",
|
||||||
DataBlockScanner.Servlet.class);
|
DataBlockScanner.Servlet.class);
|
||||||
|
|
||||||
if (WebHdfsFileSystem.isEnabled(conf, LOG)) {
|
|
||||||
infoServer.addJerseyResourcePackage(DatanodeWebHdfsMethods.class
|
|
||||||
.getPackage().getName() + ";" + Param.class.getPackage().getName(),
|
|
||||||
WebHdfsFileSystem.PATH_PREFIX + "/*");
|
|
||||||
}
|
|
||||||
this.infoServer.start();
|
this.infoServer.start();
|
||||||
|
InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0);
|
||||||
|
|
||||||
int connIdx = 0;
|
// SecureDataNodeStarter will bind the privileged port to the channel if
|
||||||
if (policy.isHttpEnabled()) {
|
// the DN is started by JSVC, pass it along.
|
||||||
infoPort = infoServer.getConnectorAddress(connIdx++).getPort();
|
ServerSocketChannel httpServerChannel = secureResources != null ?
|
||||||
|
secureResources.getHttpServerChannel() : null;
|
||||||
|
this.httpServer = new DatanodeHttpServer(conf, jettyAddr, httpServerChannel);
|
||||||
|
httpServer.start();
|
||||||
|
if (httpServer.getHttpAddress() != null) {
|
||||||
|
infoPort = httpServer.getHttpAddress().getPort();
|
||||||
}
|
}
|
||||||
|
if (httpServer.getHttpsAddress() != null) {
|
||||||
if (policy.isHttpsEnabled()) {
|
infoSecurePort = httpServer.getHttpsAddress().getPort();
|
||||||
infoSecurePort = infoServer.getConnectorAddress(connIdx).getPort();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1651,6 +1621,12 @@ public class DataNode extends ReconfigurableBase
|
||||||
LOG.warn("Exception shutting down DataNode", e);
|
LOG.warn("Exception shutting down DataNode", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
|
httpServer.close();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Exception shutting down DataNode HttpServer", e);
|
||||||
|
}
|
||||||
|
|
||||||
if (pauseMonitor != null) {
|
if (pauseMonitor != null) {
|
||||||
pauseMonitor.stop();
|
pauseMonitor.stop();
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,10 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.net.ServerSocket;
|
|
||||||
import java.nio.channels.ServerSocketChannel;
|
|
||||||
|
|
||||||
import org.apache.commons.daemon.Daemon;
|
import org.apache.commons.daemon.Daemon;
|
||||||
import org.apache.commons.daemon.DaemonContext;
|
import org.apache.commons.daemon.DaemonContext;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -28,12 +25,12 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.http.HttpConfig;
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
import org.apache.hadoop.http.HttpServer2;
|
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.mortbay.jetty.Connector;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.ServerSocket;
|
||||||
|
import java.nio.channels.ServerSocketChannel;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility class to start a datanode in a secure cluster, first obtaining
|
* Utility class to start a datanode in a secure cluster, first obtaining
|
||||||
|
@ -45,17 +42,17 @@ public class SecureDataNodeStarter implements Daemon {
|
||||||
*/
|
*/
|
||||||
public static class SecureResources {
|
public static class SecureResources {
|
||||||
private final ServerSocket streamingSocket;
|
private final ServerSocket streamingSocket;
|
||||||
private final Connector listener;
|
private final ServerSocketChannel httpServerSocket;
|
||||||
public SecureResources(ServerSocket streamingSocket,
|
public SecureResources(ServerSocket streamingSocket, ServerSocketChannel httpServerSocket) {
|
||||||
Connector listener) {
|
|
||||||
|
|
||||||
this.streamingSocket = streamingSocket;
|
this.streamingSocket = streamingSocket;
|
||||||
this.listener = listener;
|
this.httpServerSocket = httpServerSocket;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ServerSocket getStreamingSocket() { return streamingSocket; }
|
public ServerSocket getStreamingSocket() { return streamingSocket; }
|
||||||
|
|
||||||
public Connector getListener() { return listener; }
|
public ServerSocketChannel getHttpServerChannel() {
|
||||||
|
return httpServerSocket;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String [] args;
|
private String [] args;
|
||||||
|
@ -121,29 +118,31 @@ public class SecureDataNodeStarter implements Daemon {
|
||||||
// Bind a port for the web server. The code intends to bind HTTP server to
|
// Bind a port for the web server. The code intends to bind HTTP server to
|
||||||
// privileged port only, as the client can authenticate the server using
|
// privileged port only, as the client can authenticate the server using
|
||||||
// certificates if they are communicating through SSL.
|
// certificates if they are communicating through SSL.
|
||||||
Connector listener = null;
|
final ServerSocketChannel httpChannel;
|
||||||
if (policy.isHttpEnabled()) {
|
if (policy.isHttpEnabled()) {
|
||||||
listener = HttpServer2.createDefaultChannelConnector();
|
httpChannel = ServerSocketChannel.open();
|
||||||
InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
|
InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
|
||||||
listener.setHost(infoSocAddr.getHostName());
|
httpChannel.socket().bind(infoSocAddr);
|
||||||
listener.setPort(infoSocAddr.getPort());
|
InetSocketAddress localAddr = (InetSocketAddress) httpChannel.socket()
|
||||||
// Open listener here in order to bind to port as root
|
.getLocalSocketAddress();
|
||||||
listener.open();
|
|
||||||
if (listener.getPort() != infoSocAddr.getPort()) {
|
if (localAddr.getPort() != infoSocAddr.getPort()) {
|
||||||
throw new RuntimeException("Unable to bind on specified info port in secure " +
|
throw new RuntimeException("Unable to bind on specified info port in secure " +
|
||||||
"context. Needed " + streamingAddr.getPort() + ", got " + ss.getLocalPort());
|
"context. Needed " + streamingAddr.getPort() + ", got " + ss.getLocalPort());
|
||||||
}
|
}
|
||||||
System.err.println("Successfully obtained privileged resources (streaming port = "
|
System.err.println("Successfully obtained privileged resources (streaming port = "
|
||||||
+ ss + " ) (http listener port = " + listener.getConnection() +")");
|
+ ss + " ) (http listener port = " + localAddr.getPort() +")");
|
||||||
|
|
||||||
if (listener.getPort() > 1023 && isSecure) {
|
if (localAddr.getPort() > 1023 && isSecure) {
|
||||||
throw new RuntimeException(
|
throw new RuntimeException(
|
||||||
"Cannot start secure datanode with unprivileged HTTP ports");
|
"Cannot start secure datanode with unprivileged HTTP ports");
|
||||||
}
|
}
|
||||||
System.err.println("Opened info server at " + infoSocAddr);
|
System.err.println("Opened info server at " + infoSocAddr);
|
||||||
|
} else {
|
||||||
|
httpChannel = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
return new SecureResources(ss, listener);
|
return new SecureResources(ss, httpChannel);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,174 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode.web;
|
||||||
|
|
||||||
|
import io.netty.bootstrap.ChannelFactory;
|
||||||
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.ChannelInitializer;
|
||||||
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
import io.netty.channel.EventLoopGroup;
|
||||||
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
|
import io.netty.channel.socket.SocketChannel;
|
||||||
|
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||||
|
import io.netty.handler.codec.http.HttpRequestDecoder;
|
||||||
|
import io.netty.handler.codec.http.HttpResponseEncoder;
|
||||||
|
import io.netty.handler.ssl.SslHandler;
|
||||||
|
import io.netty.handler.stream.ChunkedWriteHandler;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
import java.nio.channels.ServerSocketChannel;
|
||||||
|
import java.security.GeneralSecurityException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
|
||||||
|
|
||||||
|
public class DatanodeHttpServer implements Closeable {
|
||||||
|
private final EventLoopGroup bossGroup;
|
||||||
|
private final EventLoopGroup workerGroup;
|
||||||
|
private final ServerSocketChannel externalHttpChannel;
|
||||||
|
private final ServerBootstrap httpServer;
|
||||||
|
private final SSLFactory sslFactory;
|
||||||
|
private final ServerBootstrap httpsServer;
|
||||||
|
private final Configuration conf;
|
||||||
|
private final Configuration confForCreate;
|
||||||
|
private InetSocketAddress httpAddress;
|
||||||
|
private InetSocketAddress httpsAddress;
|
||||||
|
|
||||||
|
static final Log LOG = LogFactory.getLog(DatanodeHttpServer.class);
|
||||||
|
|
||||||
|
public DatanodeHttpServer(final Configuration conf, final InetSocketAddress
|
||||||
|
jettyAddr, final ServerSocketChannel externalHttpChannel)
|
||||||
|
throws IOException {
|
||||||
|
this.conf = conf;
|
||||||
|
this.confForCreate = new Configuration(conf);
|
||||||
|
confForCreate.set(FsPermission.UMASK_LABEL, "000");
|
||||||
|
|
||||||
|
this.bossGroup = new NioEventLoopGroup();
|
||||||
|
this.workerGroup = new NioEventLoopGroup();
|
||||||
|
this.externalHttpChannel = externalHttpChannel;
|
||||||
|
HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
|
||||||
|
|
||||||
|
if (policy.isHttpEnabled()) {
|
||||||
|
this.httpServer = new ServerBootstrap().group(bossGroup, workerGroup)
|
||||||
|
.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel ch) throws Exception {
|
||||||
|
ChannelPipeline p = ch.pipeline();
|
||||||
|
p.addLast(new HttpRequestDecoder(),
|
||||||
|
new HttpResponseEncoder(),
|
||||||
|
new ChunkedWriteHandler(),
|
||||||
|
new URLDispatcher(jettyAddr, conf, confForCreate));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (externalHttpChannel == null) {
|
||||||
|
httpServer.channel(NioServerSocketChannel.class);
|
||||||
|
} else {
|
||||||
|
httpServer.channelFactory(new ChannelFactory<NioServerSocketChannel>() {
|
||||||
|
@Override
|
||||||
|
public NioServerSocketChannel newChannel() {
|
||||||
|
return new NioServerSocketChannel(externalHttpChannel) {
|
||||||
|
// The channel has been bounded externally via JSVC,
|
||||||
|
// thus bind() becomes a no-op.
|
||||||
|
@Override
|
||||||
|
protected void doBind(SocketAddress localAddress) throws Exception {}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
this.httpServer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (policy.isHttpsEnabled()) {
|
||||||
|
this.sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
|
||||||
|
try {
|
||||||
|
sslFactory.init();
|
||||||
|
} catch (GeneralSecurityException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
this.httpsServer = new ServerBootstrap().group(bossGroup, workerGroup)
|
||||||
|
.channel(NioServerSocketChannel.class)
|
||||||
|
.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel ch) throws Exception {
|
||||||
|
ChannelPipeline p = ch.pipeline();
|
||||||
|
p.addLast(
|
||||||
|
new SslHandler(sslFactory.createSSLEngine()),
|
||||||
|
new HttpRequestDecoder(),
|
||||||
|
new HttpResponseEncoder(),
|
||||||
|
new ChunkedWriteHandler(),
|
||||||
|
new URLDispatcher(jettyAddr, conf, confForCreate));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
this.httpsServer = null;
|
||||||
|
this.sslFactory = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public InetSocketAddress getHttpAddress() {
|
||||||
|
return httpAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
public InetSocketAddress getHttpsAddress() {
|
||||||
|
return httpsAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start() {
|
||||||
|
if (httpServer != null) {
|
||||||
|
ChannelFuture f = httpServer.bind(DataNode.getInfoAddr(conf));
|
||||||
|
f.syncUninterruptibly();
|
||||||
|
httpAddress = (InetSocketAddress) f.channel().localAddress();
|
||||||
|
LOG.info("Listening HTTP traffic on " + httpAddress);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (httpsServer != null) {
|
||||||
|
InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
|
||||||
|
DFS_DATANODE_HTTPS_ADDRESS_KEY, DFS_DATANODE_HTTPS_ADDRESS_DEFAULT));
|
||||||
|
ChannelFuture f = httpsServer.bind(secInfoSocAddr);
|
||||||
|
f.syncUninterruptibly();
|
||||||
|
httpsAddress = (InetSocketAddress) f.channel().localAddress();
|
||||||
|
LOG.info("Listening HTTPS traffic on " + httpsAddress);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
bossGroup.shutdownGracefully();
|
||||||
|
workerGroup.shutdownGracefully();
|
||||||
|
if (sslFactory != null) {
|
||||||
|
sslFactory.destroy();
|
||||||
|
}
|
||||||
|
if (externalHttpChannel != null) {
|
||||||
|
externalHttpChannel.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,160 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode.web;
|
||||||
|
|
||||||
|
import io.netty.bootstrap.Bootstrap;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.ChannelFutureListener;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
|
import io.netty.channel.ChannelInitializer;
|
||||||
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
|
import io.netty.channel.socket.SocketChannel;
|
||||||
|
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||||
|
import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
||||||
|
import io.netty.handler.codec.http.DefaultHttpResponse;
|
||||||
|
import io.netty.handler.codec.http.HttpRequest;
|
||||||
|
import io.netty.handler.codec.http.HttpRequestEncoder;
|
||||||
|
import io.netty.handler.codec.http.HttpResponseEncoder;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
|
||||||
|
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dead simple session-layer HTTP proxy. It gets the HTTP responses
|
||||||
|
* inside the context, assuming that the remote peer is reasonable fast and
|
||||||
|
* the response is small. The upper layer should be filtering out malicious
|
||||||
|
* inputs.
|
||||||
|
*/
|
||||||
|
class SimpleHttpProxyHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
||||||
|
private String uri;
|
||||||
|
private Channel proxiedChannel;
|
||||||
|
private final InetSocketAddress host;
|
||||||
|
static final Log LOG = DatanodeHttpServer.LOG;
|
||||||
|
|
||||||
|
SimpleHttpProxyHandler(InetSocketAddress host) {
|
||||||
|
this.host = host;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class Forwarder extends ChannelInboundHandlerAdapter {
|
||||||
|
private final String uri;
|
||||||
|
private final Channel client;
|
||||||
|
|
||||||
|
private Forwarder(String uri, Channel client) {
|
||||||
|
this.uri = uri;
|
||||||
|
this.client = client;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelInactive(ChannelHandlerContext ctx) {
|
||||||
|
closeOnFlush(client);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
|
||||||
|
client.writeAndFlush(msg).addListener(new ChannelFutureListener() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture future) {
|
||||||
|
if (future.isSuccess()) {
|
||||||
|
ctx.channel().read();
|
||||||
|
} else {
|
||||||
|
LOG.debug("Proxy failed. Cause: ", future.cause());
|
||||||
|
future.channel().close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||||
|
LOG.debug("Proxy for " + uri + " failed. cause: ", cause);
|
||||||
|
closeOnFlush(ctx.channel());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelRead0
|
||||||
|
(final ChannelHandlerContext ctx, final HttpRequest req) {
|
||||||
|
uri = req.getUri();
|
||||||
|
final Channel client = ctx.channel();
|
||||||
|
Bootstrap proxiedServer = new Bootstrap()
|
||||||
|
.group(client.eventLoop())
|
||||||
|
.channel(NioSocketChannel.class)
|
||||||
|
.handler(new ChannelInitializer<SocketChannel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel ch) throws Exception {
|
||||||
|
ChannelPipeline p = ch.pipeline();
|
||||||
|
p.addLast(new HttpRequestEncoder(), new Forwarder(uri, client));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
ChannelFuture f = proxiedServer.connect(host);
|
||||||
|
proxiedChannel = f.channel();
|
||||||
|
f.addListener(new ChannelFutureListener() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
|
if (future.isSuccess()) {
|
||||||
|
ctx.channel().pipeline().remove(HttpResponseEncoder.class);
|
||||||
|
HttpRequest newReq = new DefaultFullHttpRequest(HTTP_1_1,
|
||||||
|
req.getMethod(), req.getUri());
|
||||||
|
newReq.headers().add(req.headers());
|
||||||
|
newReq.headers().set(CONNECTION, CLOSE);
|
||||||
|
future.channel().writeAndFlush(newReq);
|
||||||
|
} else {
|
||||||
|
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1,
|
||||||
|
INTERNAL_SERVER_ERROR);
|
||||||
|
resp.headers().set(CONNECTION, CLOSE);
|
||||||
|
LOG.info("Proxy " + uri + " failed. Cause: ", future.cause());
|
||||||
|
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelInactive(ChannelHandlerContext ctx) {
|
||||||
|
if (proxiedChannel != null) {
|
||||||
|
proxiedChannel.close();
|
||||||
|
proxiedChannel = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||||
|
LOG.info("Proxy for " + uri + " failed. cause: ", cause);
|
||||||
|
if (proxiedChannel != null) {
|
||||||
|
proxiedChannel.close();
|
||||||
|
proxiedChannel = null;
|
||||||
|
}
|
||||||
|
ctx.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void closeOnFlush(Channel ch) {
|
||||||
|
if (ch.isActive()) {
|
||||||
|
ch.writeAndFlush(Unpooled.EMPTY_BUFFER)
|
||||||
|
.addListener(ChannelFutureListener.CLOSE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode.web;
|
||||||
|
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
|
import io.netty.handler.codec.http.HttpRequest;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX;
|
||||||
|
|
||||||
|
class URLDispatcher extends SimpleChannelInboundHandler<HttpRequest> {
|
||||||
|
private final InetSocketAddress proxyHost;
|
||||||
|
private final Configuration conf;
|
||||||
|
private final Configuration confForCreate;
|
||||||
|
|
||||||
|
URLDispatcher(InetSocketAddress proxyHost, Configuration conf,
|
||||||
|
Configuration confForCreate) {
|
||||||
|
this.proxyHost = proxyHost;
|
||||||
|
this.conf = conf;
|
||||||
|
this.confForCreate = confForCreate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req)
|
||||||
|
throws Exception {
|
||||||
|
String uri = req.getUri();
|
||||||
|
ChannelPipeline p = ctx.pipeline();
|
||||||
|
if (uri.startsWith(WEBHDFS_PREFIX)) {
|
||||||
|
WebHdfsHandler h = new WebHdfsHandler(conf, confForCreate);
|
||||||
|
p.replace(this, WebHdfsHandler.class.getSimpleName(), h);
|
||||||
|
h.channelRead0(ctx, req);
|
||||||
|
} else {
|
||||||
|
SimpleHttpProxyHandler h = new SimpleHttpProxyHandler(proxyHost);
|
||||||
|
p.replace(this, SimpleHttpProxyHandler.class.getSimpleName(), h);
|
||||||
|
h.channelRead0(ctx, req);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,19 +17,18 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode.web.resources;
|
package org.apache.hadoop.hdfs.server.datanode.web.resources;
|
||||||
|
|
||||||
import java.io.IOException;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import java.io.OutputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import java.lang.annotation.Annotation;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import java.lang.reflect.Type;
|
|
||||||
|
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
import javax.ws.rs.core.MultivaluedMap;
|
import javax.ws.rs.core.MultivaluedMap;
|
||||||
import javax.ws.rs.ext.MessageBodyWriter;
|
import javax.ws.rs.ext.MessageBodyWriter;
|
||||||
import javax.ws.rs.ext.Provider;
|
import javax.ws.rs.ext.Provider;
|
||||||
|
import java.io.IOException;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import java.io.OutputStream;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
import java.lang.annotation.Annotation;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import java.lang.reflect.Type;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A response entity for a HdfsDataInputStream.
|
* A response entity for a HdfsDataInputStream.
|
||||||
|
|
|
@ -0,0 +1,70 @@
|
||||||
|
/**
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License. See accompanying LICENSE file.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create UGI from the request for the WebHDFS requests for the DNs. Note that
|
||||||
|
* the DN does not authenticate the UGI -- the NN will authenticate them in
|
||||||
|
* subsequent operations.
|
||||||
|
*/
|
||||||
|
class DataNodeUGIProvider {
|
||||||
|
private final ParameterParser params;
|
||||||
|
|
||||||
|
DataNodeUGIProvider(ParameterParser params) {
|
||||||
|
this.params = params;
|
||||||
|
}
|
||||||
|
|
||||||
|
UserGroupInformation ugi() throws IOException {
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
return tokenUGI();
|
||||||
|
}
|
||||||
|
|
||||||
|
final String usernameFromQuery = params.userName();
|
||||||
|
final String doAsUserFromQuery = params.doAsUser();
|
||||||
|
final String remoteUser = usernameFromQuery == null
|
||||||
|
? JspHelper.getDefaultWebUserName(params.conf()) // not specified in
|
||||||
|
// request
|
||||||
|
: usernameFromQuery;
|
||||||
|
|
||||||
|
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser);
|
||||||
|
JspHelper.checkUsername(ugi.getShortUserName(), usernameFromQuery);
|
||||||
|
if (doAsUserFromQuery != null) {
|
||||||
|
// create and attempt to authorize a proxy user
|
||||||
|
ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi);
|
||||||
|
}
|
||||||
|
return ugi;
|
||||||
|
}
|
||||||
|
|
||||||
|
private UserGroupInformation tokenUGI() throws IOException {
|
||||||
|
Token<DelegationTokenIdentifier> token = params.delegationToken();
|
||||||
|
ByteArrayInputStream buf =
|
||||||
|
new ByteArrayInputStream(token.getIdentifier());
|
||||||
|
DataInputStream in = new DataInputStream(buf);
|
||||||
|
DelegationTokenIdentifier id = new DelegationTokenIdentifier();
|
||||||
|
id.readFields(in);
|
||||||
|
UserGroupInformation ugi = id.getUser();
|
||||||
|
ugi.addToken(token);
|
||||||
|
return ugi;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,115 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
|
||||||
|
|
||||||
|
import com.sun.jersey.api.ParamException;
|
||||||
|
import com.sun.jersey.api.container.ContainerException;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
|
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
||||||
|
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.hadoop.hdfs.web.JsonUtil;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||||
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
|
||||||
|
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||||
|
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.APPLICATION_JSON;
|
||||||
|
|
||||||
|
class ExceptionHandler {
|
||||||
|
static Log LOG = WebHdfsHandler.LOG;
|
||||||
|
|
||||||
|
static DefaultFullHttpResponse exceptionCaught(Throwable cause) {
|
||||||
|
Exception e = cause instanceof Exception ? (Exception) cause : new Exception(cause);
|
||||||
|
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("GOT EXCEPITION", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
//Convert exception
|
||||||
|
if (e instanceof ParamException) {
|
||||||
|
final ParamException paramexception = (ParamException)e;
|
||||||
|
e = new IllegalArgumentException("Invalid value for webhdfs parameter \""
|
||||||
|
+ paramexception.getParameterName() + "\": "
|
||||||
|
+ e.getCause().getMessage(), e);
|
||||||
|
} else if (e instanceof ContainerException || e instanceof SecurityException) {
|
||||||
|
e = toCause(e);
|
||||||
|
} else if (e instanceof RemoteException) {
|
||||||
|
e = ((RemoteException)e).unwrapRemoteException();
|
||||||
|
}
|
||||||
|
|
||||||
|
//Map response status
|
||||||
|
final HttpResponseStatus s;
|
||||||
|
if (e instanceof SecurityException) {
|
||||||
|
s = FORBIDDEN;
|
||||||
|
} else if (e instanceof AuthorizationException) {
|
||||||
|
s = FORBIDDEN;
|
||||||
|
} else if (e instanceof FileNotFoundException) {
|
||||||
|
s = NOT_FOUND;
|
||||||
|
} else if (e instanceof IOException) {
|
||||||
|
s = FORBIDDEN;
|
||||||
|
} else if (e instanceof UnsupportedOperationException) {
|
||||||
|
s = BAD_REQUEST;
|
||||||
|
} else if (e instanceof IllegalArgumentException) {
|
||||||
|
s = BAD_REQUEST;
|
||||||
|
} else {
|
||||||
|
LOG.warn("INTERNAL_SERVER_ERROR", e);
|
||||||
|
s = INTERNAL_SERVER_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
final byte[] js = JsonUtil.toJsonString(e).getBytes();
|
||||||
|
DefaultFullHttpResponse resp =
|
||||||
|
new DefaultFullHttpResponse(HTTP_1_1, s, Unpooled.wrappedBuffer(js));
|
||||||
|
|
||||||
|
resp.headers().set(CONTENT_TYPE, APPLICATION_JSON);
|
||||||
|
resp.headers().set(CONTENT_LENGTH, js.length);
|
||||||
|
return resp;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Exception toCause(Exception e) {
|
||||||
|
final Throwable t = e.getCause();
|
||||||
|
if (e instanceof SecurityException) {
|
||||||
|
// For the issue reported in HDFS-6475, if SecurityException's cause
|
||||||
|
// is InvalidToken, and the InvalidToken's cause is StandbyException,
|
||||||
|
// return StandbyException; Otherwise, leave the exception as is,
|
||||||
|
// since they are handled elsewhere. See HDFS-6588.
|
||||||
|
if (t != null && t instanceof SecretManager.InvalidToken) {
|
||||||
|
final Throwable t1 = t.getCause();
|
||||||
|
if (t1 != null && t1 instanceof StandbyException) {
|
||||||
|
e = (StandbyException)t1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (t != null && t instanceof Exception) {
|
||||||
|
e = (Exception)t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,82 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
|
||||||
|
|
||||||
|
import io.netty.channel.ChannelFutureListener;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
|
import io.netty.handler.codec.http.DefaultHttpResponse;
|
||||||
|
import io.netty.handler.codec.http.HttpContent;
|
||||||
|
import io.netty.handler.codec.http.LastHttpContent;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE;
|
||||||
|
|
||||||
|
class HdfsWriter extends SimpleChannelInboundHandler<HttpContent> {
|
||||||
|
private final DFSClient client;
|
||||||
|
private final OutputStream out;
|
||||||
|
private final DefaultHttpResponse response;
|
||||||
|
private static final Log LOG = WebHdfsHandler.LOG;
|
||||||
|
|
||||||
|
HdfsWriter(DFSClient client, OutputStream out, DefaultHttpResponse response) {
|
||||||
|
this.client = client;
|
||||||
|
this.out = out;
|
||||||
|
this.response = response;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
ctx.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void channelRead0(ChannelHandlerContext ctx, HttpContent chunk)
|
||||||
|
throws IOException {
|
||||||
|
chunk.content().readBytes(out, chunk.content().readableBytes());
|
||||||
|
if (chunk instanceof LastHttpContent) {
|
||||||
|
response.headers().set(CONNECTION, CLOSE);
|
||||||
|
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
|
||||||
|
releaseDfsResources();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelInactive(ChannelHandlerContext ctx) {
|
||||||
|
releaseDfsResources();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||||
|
releaseDfsResources();
|
||||||
|
DefaultHttpResponse resp = ExceptionHandler.exceptionCaught(cause);
|
||||||
|
resp.headers().set(CONNECTION, CLOSE);
|
||||||
|
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void releaseDfsResources() {
|
||||||
|
IOUtils.cleanup(LOG, out);
|
||||||
|
IOUtils.cleanup(LOG, client);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,125 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
|
||||||
|
|
||||||
|
import io.netty.handler.codec.http.QueryStringDecoder;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hdfs.HAUtil;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.PermissionParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HDFS_URI_SCHEME;
|
||||||
|
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX_LENGTH;
|
||||||
|
|
||||||
|
class ParameterParser {
|
||||||
|
private final Configuration conf;
|
||||||
|
private final String path;
|
||||||
|
private final Map<String, List<String>> params;
|
||||||
|
|
||||||
|
ParameterParser(QueryStringDecoder decoder, Configuration conf) {
|
||||||
|
this.path = decoder.path().substring(WEBHDFS_PREFIX_LENGTH);
|
||||||
|
this.params = decoder.parameters();
|
||||||
|
this.conf = conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
String path() { return path; }
|
||||||
|
|
||||||
|
String op() {
|
||||||
|
return param(HttpOpParam.NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
long offset() {
|
||||||
|
return new OffsetParam(param(OffsetParam.NAME)).getValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
String namenodeId() {
|
||||||
|
return new NamenodeAddressParam(param(NamenodeAddressParam.NAME))
|
||||||
|
.getValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
String doAsUser() {
|
||||||
|
return new DoAsParam(param(DoAsParam.NAME)).getValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
String userName() {
|
||||||
|
return new UserParam(param(UserParam.NAME)).getValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
int bufferSize() {
|
||||||
|
return new BufferSizeParam(param(BufferSizeParam.NAME)).getValue(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
long blockSize() {
|
||||||
|
return new BlockSizeParam(param(BlockSizeParam.NAME)).getValue(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
short replication() {
|
||||||
|
return new ReplicationParam(param(ReplicationParam.NAME)).getValue(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
FsPermission permission() {
|
||||||
|
return new PermissionParam(param(PermissionParam.NAME)).getFsPermission();
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean overwrite() {
|
||||||
|
return new OverwriteParam(param(OverwriteParam.NAME)).getValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
Token<DelegationTokenIdentifier> delegationToken() throws IOException {
|
||||||
|
String delegation = param(DelegationParam.NAME);
|
||||||
|
final Token<DelegationTokenIdentifier> token = new
|
||||||
|
Token<DelegationTokenIdentifier>();
|
||||||
|
token.decodeFromUrlString(delegation);
|
||||||
|
URI nnUri = URI.create(HDFS_URI_SCHEME + "://" + namenodeId());
|
||||||
|
boolean isLogical = HAUtil.isLogicalUri(conf, nnUri);
|
||||||
|
if (isLogical) {
|
||||||
|
token.setService(HAUtil.buildTokenServiceForLogicalUri(nnUri,
|
||||||
|
HDFS_URI_SCHEME));
|
||||||
|
} else {
|
||||||
|
token.setService(SecurityUtil.buildTokenService(nnUri));
|
||||||
|
}
|
||||||
|
return token;
|
||||||
|
}
|
||||||
|
|
||||||
|
Configuration conf() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String param(String key) {
|
||||||
|
List<String> p = params.get(key);
|
||||||
|
return p == null ? null : p.get(0);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,256 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
|
import io.netty.channel.ChannelFutureListener;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
|
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
||||||
|
import io.netty.handler.codec.http.DefaultHttpResponse;
|
||||||
|
import io.netty.handler.codec.http.HttpHeaders;
|
||||||
|
import io.netty.handler.codec.http.HttpMethod;
|
||||||
|
import io.netty.handler.codec.http.HttpRequest;
|
||||||
|
import io.netty.handler.codec.http.QueryStringDecoder;
|
||||||
|
import io.netty.handler.stream.ChunkedStream;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
|
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.web.JsonUtil;
|
||||||
|
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.PostOpParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Names.ACCESS_CONTROL_ALLOW_METHODS;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Names.LOCATION;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE;
|
||||||
|
import static io.netty.handler.codec.http.HttpMethod.GET;
|
||||||
|
import static io.netty.handler.codec.http.HttpMethod.POST;
|
||||||
|
import static io.netty.handler.codec.http.HttpMethod.PUT;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.CREATED;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
|
||||||
|
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||||
|
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HDFS_URI_SCHEME;
|
||||||
|
import static org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier.HDFS_DELEGATION_KIND;
|
||||||
|
|
||||||
|
public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
||||||
|
static final Log LOG = LogFactory.getLog(WebHdfsHandler.class);
|
||||||
|
public static final String WEBHDFS_PREFIX = WebHdfsFileSystem.PATH_PREFIX;
|
||||||
|
public static final int WEBHDFS_PREFIX_LENGTH = WEBHDFS_PREFIX.length();
|
||||||
|
public static final String APPLICATION_OCTET_STREAM =
|
||||||
|
"application/octet-stream";
|
||||||
|
public static final String APPLICATION_JSON = "application/json";
|
||||||
|
|
||||||
|
private final Configuration conf;
|
||||||
|
private final Configuration confForCreate;
|
||||||
|
|
||||||
|
private String path;
|
||||||
|
private ParameterParser params;
|
||||||
|
private UserGroupInformation ugi;
|
||||||
|
|
||||||
|
public WebHdfsHandler(Configuration conf, Configuration confForCreate)
|
||||||
|
throws IOException {
|
||||||
|
this.conf = conf;
|
||||||
|
this.confForCreate = confForCreate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelRead0(final ChannelHandlerContext ctx,
|
||||||
|
final HttpRequest req) throws Exception {
|
||||||
|
Preconditions.checkArgument(req.getUri().startsWith(WEBHDFS_PREFIX));
|
||||||
|
QueryStringDecoder queryString = new QueryStringDecoder(req.getUri());
|
||||||
|
params = new ParameterParser(queryString, conf);
|
||||||
|
DataNodeUGIProvider ugiProvider = new DataNodeUGIProvider(params);
|
||||||
|
ugi = ugiProvider.ugi();
|
||||||
|
path = params.path();
|
||||||
|
|
||||||
|
injectToken();
|
||||||
|
ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws Exception {
|
||||||
|
handle(ctx, req);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void handle(ChannelHandlerContext ctx, HttpRequest req)
|
||||||
|
throws IOException, URISyntaxException {
|
||||||
|
String op = params.op();
|
||||||
|
HttpMethod method = req.getMethod();
|
||||||
|
if (PutOpParam.Op.CREATE.name().equalsIgnoreCase(op)
|
||||||
|
&& method == PUT) {
|
||||||
|
onCreate(ctx);
|
||||||
|
} else if (PostOpParam.Op.APPEND.name().equalsIgnoreCase(op)
|
||||||
|
&& method == POST) {
|
||||||
|
onAppend(ctx);
|
||||||
|
} else if (GetOpParam.Op.OPEN.name().equalsIgnoreCase(op)
|
||||||
|
&& method == GET) {
|
||||||
|
onOpen(ctx);
|
||||||
|
} else if(GetOpParam.Op.GETFILECHECKSUM.name().equalsIgnoreCase(op)
|
||||||
|
&& method == GET) {
|
||||||
|
onGetFileChecksum(ctx);
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException("Invalid operation " + op);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||||
|
LOG.debug("Error ", cause);
|
||||||
|
DefaultHttpResponse resp = ExceptionHandler.exceptionCaught(cause);
|
||||||
|
resp.headers().set(CONNECTION, CLOSE);
|
||||||
|
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onCreate(ChannelHandlerContext ctx)
|
||||||
|
throws IOException, URISyntaxException {
|
||||||
|
writeContinueHeader(ctx);
|
||||||
|
|
||||||
|
final String nnId = params.namenodeId();
|
||||||
|
final int bufferSize = params.bufferSize();
|
||||||
|
final short replication = params.replication();
|
||||||
|
final long blockSize = params.blockSize();
|
||||||
|
final FsPermission permission = params.permission();
|
||||||
|
|
||||||
|
EnumSet<CreateFlag> flags = params.overwrite() ?
|
||||||
|
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
|
||||||
|
: EnumSet.of(CreateFlag.CREATE);
|
||||||
|
|
||||||
|
final DFSClient dfsClient = newDfsClient(nnId, confForCreate);
|
||||||
|
OutputStream out = dfsClient.createWrappedOutputStream(dfsClient.create(
|
||||||
|
path, permission, flags, replication,
|
||||||
|
blockSize, null, bufferSize, null), null);
|
||||||
|
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, CREATED);
|
||||||
|
|
||||||
|
final URI uri = new URI(HDFS_URI_SCHEME, nnId, path, null, null);
|
||||||
|
resp.headers().set(LOCATION, uri.toString());
|
||||||
|
resp.headers().set(CONTENT_LENGTH, 0);
|
||||||
|
ctx.pipeline().replace(this, HdfsWriter.class.getSimpleName(),
|
||||||
|
new HdfsWriter(dfsClient, out, resp));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onAppend(ChannelHandlerContext ctx) throws IOException {
|
||||||
|
writeContinueHeader(ctx);
|
||||||
|
final String nnId = params.namenodeId();
|
||||||
|
final int bufferSize = params.bufferSize();
|
||||||
|
|
||||||
|
DFSClient dfsClient = newDfsClient(nnId, conf);
|
||||||
|
OutputStream out = dfsClient.append(path, bufferSize, null, null);
|
||||||
|
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, OK);
|
||||||
|
resp.headers().set(CONTENT_LENGTH, 0);
|
||||||
|
ctx.pipeline().replace(this, HdfsWriter.class.getSimpleName(),
|
||||||
|
new HdfsWriter(dfsClient, out, resp));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onOpen(ChannelHandlerContext ctx) throws IOException {
|
||||||
|
final String nnId = params.namenodeId();
|
||||||
|
final int bufferSize = params.bufferSize();
|
||||||
|
final long offset = params.offset();
|
||||||
|
|
||||||
|
DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
|
||||||
|
HttpHeaders headers = response.headers();
|
||||||
|
// Allow the UI to access the file
|
||||||
|
headers.set(ACCESS_CONTROL_ALLOW_METHODS, GET);
|
||||||
|
headers.set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
|
||||||
|
headers.set(CONTENT_TYPE, APPLICATION_OCTET_STREAM);
|
||||||
|
headers.set(CONNECTION, CLOSE);
|
||||||
|
|
||||||
|
final DFSClient dfsclient = newDfsClient(nnId, conf);
|
||||||
|
HdfsDataInputStream in = dfsclient.createWrappedInputStream(
|
||||||
|
dfsclient.open(path, bufferSize, true));
|
||||||
|
in.seek(offset);
|
||||||
|
|
||||||
|
if (in.getVisibleLength() >= offset) {
|
||||||
|
headers.set(CONTENT_LENGTH, in.getVisibleLength() - offset);
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.write(response);
|
||||||
|
ctx.writeAndFlush(new ChunkedStream(in) {
|
||||||
|
@Override
|
||||||
|
public void close() throws Exception {
|
||||||
|
super.close();
|
||||||
|
dfsclient.close();
|
||||||
|
}
|
||||||
|
}).addListener(ChannelFutureListener.CLOSE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onGetFileChecksum(ChannelHandlerContext ctx) throws IOException {
|
||||||
|
MD5MD5CRC32FileChecksum checksum = null;
|
||||||
|
final String nnId = params.namenodeId();
|
||||||
|
DFSClient dfsclient = newDfsClient(nnId, conf);
|
||||||
|
try {
|
||||||
|
checksum = dfsclient.getFileChecksum(path, Long.MAX_VALUE);
|
||||||
|
dfsclient.close();
|
||||||
|
dfsclient = null;
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(LOG, dfsclient);
|
||||||
|
}
|
||||||
|
final byte[] js = JsonUtil.toJsonString(checksum).getBytes();
|
||||||
|
DefaultFullHttpResponse resp =
|
||||||
|
new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(js));
|
||||||
|
|
||||||
|
resp.headers().set(CONTENT_TYPE, APPLICATION_JSON);
|
||||||
|
resp.headers().set(CONTENT_LENGTH, js.length);
|
||||||
|
resp.headers().set(CONNECTION, CLOSE);
|
||||||
|
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void writeContinueHeader(ChannelHandlerContext ctx) {
|
||||||
|
DefaultHttpResponse r = new DefaultFullHttpResponse(HTTP_1_1, CONTINUE,
|
||||||
|
Unpooled.EMPTY_BUFFER);
|
||||||
|
ctx.writeAndFlush(r);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static DFSClient newDfsClient
|
||||||
|
(String nnId, Configuration conf) throws IOException {
|
||||||
|
URI uri = URI.create(HDFS_URI_SCHEME + "://" + nnId);
|
||||||
|
return new DFSClient(uri, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void injectToken() throws IOException {
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
Token<DelegationTokenIdentifier> token = params.delegationToken();
|
||||||
|
token.setKind(HDFS_DELEGATION_KIND);
|
||||||
|
ugi.addToken(token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -84,6 +84,7 @@ public class DatanodeRegistration extends DatanodeID
|
||||||
+ "(" + getIpAddr()
|
+ "(" + getIpAddr()
|
||||||
+ ", datanodeUuid=" + getDatanodeUuid()
|
+ ", datanodeUuid=" + getDatanodeUuid()
|
||||||
+ ", infoPort=" + getInfoPort()
|
+ ", infoPort=" + getInfoPort()
|
||||||
|
+ ", infoSecurePort=" + getInfoSecurePort()
|
||||||
+ ", ipcPort=" + getIpcPort()
|
+ ", ipcPort=" + getIpcPort()
|
||||||
+ ", storageInfo=" + storageInfo
|
+ ", storageInfo=" + storageInfo
|
||||||
+ ")";
|
+ ")";
|
||||||
|
|
|
@ -72,7 +72,8 @@ abstract class IntegerParam extends Param<Integer, IntegerParam.Domain> {
|
||||||
@Override
|
@Override
|
||||||
Integer parse(final String str) {
|
Integer parse(final String str) {
|
||||||
try{
|
try{
|
||||||
return NULL.equals(str)? null: Integer.parseInt(str, radix);
|
return NULL.equals(str) || str == null ? null : Integer.parseInt(str,
|
||||||
|
radix);
|
||||||
} catch(NumberFormatException e) {
|
} catch(NumberFormatException e) {
|
||||||
throw new IllegalArgumentException("Failed to parse \"" + str
|
throw new IllegalArgumentException("Failed to parse \"" + str
|
||||||
+ "\" as a radix-" + radix + " integer.", e);
|
+ "\" as a radix-" + radix + " integer.", e);
|
||||||
|
|
|
@ -71,7 +71,8 @@ abstract class LongParam extends Param<Long, LongParam.Domain> {
|
||||||
@Override
|
@Override
|
||||||
Long parse(final String str) {
|
Long parse(final String str) {
|
||||||
try {
|
try {
|
||||||
return NULL.equals(str)? null: Long.parseLong(str, radix);
|
return NULL.equals(str) || str == null ? null: Long.parseLong(str,
|
||||||
|
radix);
|
||||||
} catch(NumberFormatException e) {
|
} catch(NumberFormatException e) {
|
||||||
throw new IllegalArgumentException("Failed to parse \"" + str
|
throw new IllegalArgumentException("Failed to parse \"" + str
|
||||||
+ "\" as a radix-" + radix + " long integer.", e);
|
+ "\" as a radix-" + radix + " long integer.", e);
|
||||||
|
|
|
@ -72,7 +72,8 @@ abstract class ShortParam extends Param<Short, ShortParam.Domain> {
|
||||||
@Override
|
@Override
|
||||||
Short parse(final String str) {
|
Short parse(final String str) {
|
||||||
try {
|
try {
|
||||||
return NULL.equals(str)? null: Short.parseShort(str, radix);
|
return NULL.equals(str) || str == null ? null : Short.parseShort(str,
|
||||||
|
radix);
|
||||||
} catch(NumberFormatException e) {
|
} catch(NumberFormatException e) {
|
||||||
throw new IllegalArgumentException("Failed to parse \"" + str
|
throw new IllegalArgumentException("Failed to parse \"" + str
|
||||||
+ "\" as a radix-" + radix + " short integer.", e);
|
+ "\" as a radix-" + radix + " short integer.", e);
|
||||||
|
|
|
@ -308,8 +308,8 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
||||||
|
|
||||||
// Open the file, but request length longer than actual file length by 1.
|
// Open the file, but request length longer than actual file length by 1.
|
||||||
HttpOpParam.Op op = GetOpParam.Op.OPEN;
|
HttpOpParam.Op op = GetOpParam.Op.OPEN;
|
||||||
URL url = webhdfs.toUrl(op, testFile, new LengthParam(Long.valueOf(
|
URL url = webhdfs.toUrl(op, testFile, new LengthParam((long) (content
|
||||||
content.length() + 1)));
|
.length() + 1)));
|
||||||
HttpURLConnection conn = null;
|
HttpURLConnection conn = null;
|
||||||
InputStream is = null;
|
InputStream is = null;
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue