HDFS-8377. Support HTTP/2 in datanode. Contributed by Duo Zhang.

This commit is contained in:
Haohui Mai 2015-05-24 22:30:32 -07:00
parent 446d51591e
commit ada233b7cd
15 changed files with 482 additions and 87 deletions

View File

@ -580,6 +580,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8421. Move startFile() and related functions into FSDirWriteFileOp.
(wheat9)
HDFS-8377. Support HTTP/2 in datanode. (Duo Zhang via wheat9)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -181,6 +181,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>netty-all</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hpack</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode.web;
import io.netty.bootstrap.ChannelFactory;
import io.netty.channel.ChannelFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
@ -83,11 +83,8 @@ public class DatanodeHttpServer implements Closeable {
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpRequestDecoder(),
new HttpResponseEncoder(),
new ChunkedWriteHandler(),
new URLDispatcher(jettyAddr, conf, confForCreate));
ch.pipeline().addLast(new PortUnificationServerHandler(jettyAddr,
conf, confForCreate));
}
});
if (externalHttpChannel == null) {

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.web;
import java.net.InetSocketAddress;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.datanode.web.dtp.DtpHttp2Handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* A port unification handler to support HTTP/1.1 and HTTP/2 on the same port.
*/
@InterfaceAudience.Private
public class PortUnificationServerHandler extends ByteToMessageDecoder {
private static final ByteBuf HTTP2_CLIENT_CONNECTION_PREFACE = Http2CodecUtil
.connectionPrefaceBuf();
// we only want to support HTTP/1.1 and HTTP/2, so the first 3 bytes is
// enough. No HTTP/1.1 request could start with "PRI"
private static final int MAGIC_HEADER_LENGTH = 3;
private final InetSocketAddress proxyHost;
private final Configuration conf;
private final Configuration confForCreate;
public PortUnificationServerHandler(InetSocketAddress proxyHost,
Configuration conf, Configuration confForCreate) {
this.proxyHost = proxyHost;
this.conf = conf;
this.confForCreate = confForCreate;
}
private void configureHttp1(ChannelHandlerContext ctx) {
ctx.pipeline().addLast(new HttpServerCodec(), new ChunkedWriteHandler(),
new URLDispatcher(proxyHost, conf, confForCreate));
}
private void configureHttp2(ChannelHandlerContext ctx) {
ctx.pipeline().addLast(new DtpHttp2Handler());
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
if (in.readableBytes() < MAGIC_HEADER_LENGTH) {
return;
}
if (ByteBufUtil.equals(in, 0, HTTP2_CLIENT_CONNECTION_PREFACE, 0,
MAGIC_HEADER_LENGTH)) {
configureHttp2(ctx);
} else {
configureHttp1(ctx);
}
ctx.pipeline().remove(this);
}
}

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.datanode.web;
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
@ -31,17 +34,14 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import org.apache.commons.logging.Log;
import java.net.InetSocketAddress;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaders.Values;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import org.apache.commons.logging.Log;
/**
* Dead simple session-layer HTTP proxy. It gets the HTTP responses
@ -98,7 +98,7 @@ class SimpleHttpProxyHandler extends SimpleChannelInboundHandler<HttpRequest> {
@Override
public void channelRead0
(final ChannelHandlerContext ctx, final HttpRequest req) {
uri = req.getUri();
uri = req.uri();
final Channel client = ctx.channel();
Bootstrap proxiedServer = new Bootstrap()
.group(client.eventLoop())
@ -118,14 +118,14 @@ class SimpleHttpProxyHandler extends SimpleChannelInboundHandler<HttpRequest> {
if (future.isSuccess()) {
ctx.channel().pipeline().remove(HttpResponseEncoder.class);
HttpRequest newReq = new DefaultFullHttpRequest(HTTP_1_1,
req.getMethod(), req.getUri());
req.method(), req.uri());
newReq.headers().add(req.headers());
newReq.headers().set(CONNECTION, Values.CLOSE);
newReq.headers().set(CONNECTION, HttpHeaderValues.CLOSE);
future.channel().writeAndFlush(newReq);
} else {
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1,
INTERNAL_SERVER_ERROR);
resp.headers().set(CONNECTION, Values.CLOSE);
resp.headers().set(CONNECTION, HttpHeaderValues.CLOSE);
LOG.info("Proxy " + uri + " failed. Cause: ", future.cause());
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
client.close();

View File

@ -17,17 +17,16 @@
*/
package org.apache.hadoop.hdfs.server.datanode.web;
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpRequest;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler;
import java.io.IOException;
import java.net.InetSocketAddress;
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler;
class URLDispatcher extends SimpleChannelInboundHandler<HttpRequest> {
private final InetSocketAddress proxyHost;
@ -44,7 +43,7 @@ class URLDispatcher extends SimpleChannelInboundHandler<HttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req)
throws Exception {
String uri = req.getUri();
String uri = req.uri();
ChannelPipeline p = ctx.pipeline();
if (uri.startsWith(WEBHDFS_PREFIX)) {
WebHdfsHandler h = new WebHdfsHandler(conf, confForCreate);

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.web.dtp;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameAdapter;
import io.netty.handler.codec.http2.Http2Headers;
import java.nio.charset.StandardCharsets;
class DtpHttp2FrameListener extends Http2FrameAdapter {
private Http2ConnectionEncoder encoder;
public void encoder(Http2ConnectionEncoder encoder) {
this.encoder = encoder;
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int streamDependency, short weight,
boolean exclusive, int padding, boolean endStream) throws Http2Exception {
encoder.writeHeaders(ctx, streamId,
new DefaultHttp2Headers().status(HttpResponseStatus.OK.codeAsText()), 0,
false, ctx.newPromise());
encoder.writeData(
ctx,
streamId,
ctx.alloc().buffer()
.writeBytes("HTTP/2 DTP".getBytes(StandardCharsets.UTF_8)), 0, true,
ctx.newPromise());
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.web.dtp;
import org.apache.hadoop.classification.InterfaceAudience;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
/**
* The HTTP/2 handler.
*/
@InterfaceAudience.Private
public class DtpHttp2Handler extends Http2ConnectionHandler {
public DtpHttp2Handler() {
super(true, new DtpHttp2FrameListener());
((DtpHttp2FrameListener) decoder().listener()).encoder(encoder());
}
}

View File

@ -17,12 +17,21 @@
*/
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
import com.google.common.base.Charsets;
import com.sun.jersey.api.ParamException;
import com.sun.jersey.api.container.ContainerException;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.APPLICATION_JSON_UTF8;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.ipc.RemoteException;
@ -30,17 +39,9 @@ import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.token.SecretManager;
import java.io.FileNotFoundException;
import java.io.IOException;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.APPLICATION_JSON_UTF8;
import com.google.common.base.Charsets;
import com.sun.jersey.api.ParamException;
import com.sun.jersey.api.container.ContainerException;
class ExceptionHandler {
static Log LOG = WebHdfsHandler.LOG;

View File

@ -17,21 +17,21 @@
*/
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
import java.io.OutputStream;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.io.IOUtils;
class HdfsWriter extends SimpleChannelInboundHandler<HttpContent> {
private final DFSClient client;

View File

@ -17,7 +17,22 @@
*/
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
import com.google.common.base.Preconditions;
import static io.netty.handler.codec.http.HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS;
import static io.netty.handler.codec.http.HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN;
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpHeaderNames.LOCATION;
import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpMethod.POST;
import static io.netty.handler.codec.http.HttpMethod.PUT;
import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE;
import static io.netty.handler.codec.http.HttpResponseStatus.CREATED;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HDFS_URI_SCHEME;
import static org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier.HDFS_DELEGATION_KIND;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
@ -29,6 +44,15 @@ import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.stream.ChunkedStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
import org.apache.commons.io.Charsets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -49,30 +73,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.LimitInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
import static io.netty.handler.codec.http.HttpHeaders.Names.ACCESS_CONTROL_ALLOW_METHODS;
import static io.netty.handler.codec.http.HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpHeaders.Names.LOCATION;
import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpMethod.POST;
import static io.netty.handler.codec.http.HttpMethod.PUT;
import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE;
import static io.netty.handler.codec.http.HttpResponseStatus.CREATED;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HDFS_URI_SCHEME;
import static org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier.HDFS_DELEGATION_KIND;
import com.google.common.base.Preconditions;
public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
static final Log LOG = LogFactory.getLog(WebHdfsHandler.class);
@ -99,8 +100,8 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
@Override
public void channelRead0(final ChannelHandlerContext ctx,
final HttpRequest req) throws Exception {
Preconditions.checkArgument(req.getUri().startsWith(WEBHDFS_PREFIX));
QueryStringDecoder queryString = new QueryStringDecoder(req.getUri());
Preconditions.checkArgument(req.uri().startsWith(WEBHDFS_PREFIX));
QueryStringDecoder queryString = new QueryStringDecoder(req.uri());
params = new ParameterParser(queryString, conf);
DataNodeUGIProvider ugiProvider = new DataNodeUGIProvider(params);
ugi = ugiProvider.ugi();
@ -119,7 +120,7 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
public void handle(ChannelHandlerContext ctx, HttpRequest req)
throws IOException, URISyntaxException {
String op = params.op();
HttpMethod method = req.getMethod();
HttpMethod method = req.method();
if (PutOpParam.Op.CREATE.name().equalsIgnoreCase(op)
&& method == PUT) {
onCreate(ctx);

View File

@ -17,7 +17,18 @@
*/
package org.apache.hadoop.hdfs.tools.offlineImageViewer;
import com.google.common.base.Charsets;
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.APPLICATION_JSON_UTF8;
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX;
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX_LENGTH;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
@ -30,28 +41,18 @@ import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.QueryStringDecoder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.util.StringUtils;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.APPLICATION_JSON_UTF8;
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX;
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX_LENGTH;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Charsets;
/**
* Implement the read-only WebHDFS API for fsimage.
@ -74,7 +75,7 @@ class FSImageHandler extends SimpleChannelInboundHandler<HttpRequest> {
@Override
public void channelRead0(ChannelHandlerContext ctx, HttpRequest request)
throws Exception {
if (request.getMethod() != HttpMethod.GET) {
if (request.method() != HttpMethod.GET) {
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1,
METHOD_NOT_ALLOWED);
resp.headers().set(CONNECTION, CLOSE);
@ -82,7 +83,7 @@ class FSImageHandler extends SimpleChannelInboundHandler<HttpRequest> {
return;
}
QueryStringDecoder decoder = new QueryStringDecoder(request.getUri());
QueryStringDecoder decoder = new QueryStringDecoder(request.uri());
final String op = getOp(decoder);
final String content;

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.web.dtp;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http2.HttpUtil;
import io.netty.util.concurrent.Promise;
import java.util.HashMap;
import java.util.Map;
public class Http2ResponseHandler extends
SimpleChannelInboundHandler<FullHttpResponse> {
private Map<Integer, Promise<FullHttpResponse>> streamId2Promise =
new HashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg)
throws Exception {
Integer streamId =
msg.headers().getInt(HttpUtil.ExtensionHeaderNames.STREAM_ID.text());
if (streamId == null) {
System.err.println("HttpResponseHandler unexpected message received: "
+ msg);
return;
}
if (streamId.intValue() == 1) {
// this is the upgrade response message, just ignore it.
return;
}
Promise<FullHttpResponse> promise;
synchronized (this) {
promise = streamId2Promise.get(streamId);
}
if (promise == null) {
System.err.println("Message received for unknown stream id " + streamId);
} else {
// Do stuff with the message (for now just print it)
promise.setSuccess(msg.retain());
}
}
public void put(Integer streamId, Promise<FullHttpResponse> promise) {
streamId2Promise.put(streamId, promise);
}
}

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.web.dtp;
import static org.junit.Assert.assertEquals;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler;
import io.netty.handler.codec.http2.HttpUtil;
import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapter;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.timeout.TimeoutException;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestDtpHttp2 {
private static final Http2FrameLogger FRAME_LOGGER = new Http2FrameLogger(
LogLevel.INFO, TestDtpHttp2.class);
private static final Configuration CONF = WebHdfsTestUtil.createConf();
private static MiniDFSCluster CLUSTER;
private static final EventLoopGroup WORKER_GROUP = new NioEventLoopGroup();
private static Channel CHANNEL;
private static Http2ResponseHandler RESPONSE_HANDLER;
@BeforeClass
public static void setUp() throws IOException, URISyntaxException,
TimeoutException {
CLUSTER = new MiniDFSCluster.Builder(CONF).numDataNodes(1).build();
CLUSTER.waitActive();
RESPONSE_HANDLER = new Http2ResponseHandler();
Bootstrap bootstrap =
new Bootstrap()
.group(WORKER_GROUP)
.channel(NioSocketChannel.class)
.remoteAddress("127.0.0.1",
CLUSTER.getDataNodes().get(0).getInfoPort())
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
Http2Connection connection = new DefaultHttp2Connection(false);
Http2ConnectionHandler connectionHandler =
new HttpToHttp2ConnectionHandler(connection, frameReader(),
frameWriter(), new DelegatingDecompressorFrameListener(
connection, new InboundHttp2ToHttpAdapter.Builder(
connection).maxContentLength(Integer.MAX_VALUE)
.propagateSettings(true).build()));
ch.pipeline().addLast(connectionHandler, RESPONSE_HANDLER);
}
});
CHANNEL = bootstrap.connect().syncUninterruptibly().channel();
}
@AfterClass
public static void tearDown() throws IOException {
if (CHANNEL != null) {
CHANNEL.close().syncUninterruptibly();
}
WORKER_GROUP.shutdownGracefully();
if (CLUSTER != null) {
CLUSTER.shutdown();
}
}
private static Http2FrameReader frameReader() {
return new Http2InboundFrameLogger(new DefaultHttp2FrameReader(),
FRAME_LOGGER);
}
private static Http2FrameWriter frameWriter() {
return new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(),
FRAME_LOGGER);
}
@Test
public void test() throws InterruptedException, ExecutionException {
int streamId = 3;
FullHttpRequest request =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
request.headers().add(HttpUtil.ExtensionHeaderNames.STREAM_ID.text(),
streamId);
Promise<FullHttpResponse> promise = CHANNEL.eventLoop().newPromise();
synchronized (RESPONSE_HANDLER) {
CHANNEL.writeAndFlush(request);
RESPONSE_HANDLER.put(streamId, promise);
}
assertEquals(HttpResponseStatus.OK, promise.get().status());
ByteBuf content = promise.get().content();
assertEquals("HTTP/2 DTP", content.toString(StandardCharsets.UTF_8));
}
}

View File

@ -579,7 +579,13 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.23.Final</version>
<version>4.1.0.Beta5</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hpack</artifactId>
<version>0.11.0</version>
</dependency>
<dependency>