Revert "HDFS-8377. Support HTTP/2 in datanode. Contributed by Duo Zhang."
This reverts commit f70b9d9241
.
This commit is contained in:
parent
1b1a25cca6
commit
67d2875f8f
|
@ -183,11 +183,6 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
<artifactId>netty-all</artifactId>
|
<artifactId>netty-all</artifactId>
|
||||||
<scope>compile</scope>
|
<scope>compile</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>com.twitter</groupId>
|
|
||||||
<artifactId>hpack</artifactId>
|
|
||||||
<scope>compile</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>xerces</groupId>
|
<groupId>xerces</groupId>
|
||||||
<artifactId>xercesImpl</artifactId>
|
<artifactId>xercesImpl</artifactId>
|
||||||
|
|
|
@ -26,8 +26,8 @@ import javax.servlet.FilterConfig;
|
||||||
import javax.servlet.ServletContext;
|
import javax.servlet.ServletContext;
|
||||||
import javax.servlet.ServletException;
|
import javax.servlet.ServletException;
|
||||||
|
|
||||||
|
import io.netty.bootstrap.ChannelFactory;
|
||||||
import io.netty.bootstrap.ServerBootstrap;
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
import io.netty.channel.ChannelFactory;
|
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelInitializer;
|
import io.netty.channel.ChannelInitializer;
|
||||||
import io.netty.channel.ChannelOption;
|
import io.netty.channel.ChannelOption;
|
||||||
|
@ -144,8 +144,16 @@ public class DatanodeHttpServer implements Closeable {
|
||||||
.childHandler(new ChannelInitializer<SocketChannel>() {
|
.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||||
@Override
|
@Override
|
||||||
protected void initChannel(SocketChannel ch) throws Exception {
|
protected void initChannel(SocketChannel ch) throws Exception {
|
||||||
ch.pipeline().addLast(new PortUnificationServerHandler(jettyAddr,
|
ChannelPipeline p = ch.pipeline();
|
||||||
conf, confForCreate, restCsrfPreventionFilter));
|
p.addLast(new HttpRequestDecoder(),
|
||||||
|
new HttpResponseEncoder());
|
||||||
|
if (restCsrfPreventionFilter != null) {
|
||||||
|
p.addLast(new RestCsrfPreventionFilterHandler(
|
||||||
|
restCsrfPreventionFilter));
|
||||||
|
}
|
||||||
|
p.addLast(
|
||||||
|
new ChunkedWriteHandler(),
|
||||||
|
new URLDispatcher(jettyAddr, conf, confForCreate));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -1,99 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode.web;
|
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.web.dtp.DtpHttp2Handler;
|
|
||||||
import org.apache.hadoop.security.http.RestCsrfPreventionFilter;
|
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
|
||||||
import io.netty.buffer.ByteBufUtil;
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
|
||||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
|
||||||
import io.netty.handler.codec.http.HttpServerCodec;
|
|
||||||
import io.netty.handler.codec.http2.Http2CodecUtil;
|
|
||||||
import io.netty.handler.stream.ChunkedWriteHandler;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A port unification handler to support HTTP/1.1 and HTTP/2 on the same port.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class PortUnificationServerHandler extends ByteToMessageDecoder {
|
|
||||||
|
|
||||||
private static final ByteBuf HTTP2_CLIENT_CONNECTION_PREFACE = Http2CodecUtil
|
|
||||||
.connectionPrefaceBuf();
|
|
||||||
|
|
||||||
// we only want to support HTTP/1.1 and HTTP/2, so the first 3 bytes is
|
|
||||||
// enough. No HTTP/1.1 request could start with "PRI"
|
|
||||||
private static final int MAGIC_HEADER_LENGTH = 3;
|
|
||||||
|
|
||||||
private final InetSocketAddress proxyHost;
|
|
||||||
|
|
||||||
private final Configuration conf;
|
|
||||||
|
|
||||||
private final Configuration confForCreate;
|
|
||||||
|
|
||||||
private final RestCsrfPreventionFilter restCsrfPreventionFilter;
|
|
||||||
|
|
||||||
public PortUnificationServerHandler(InetSocketAddress proxyHost,
|
|
||||||
Configuration conf, Configuration confForCreate,
|
|
||||||
RestCsrfPreventionFilter restCsrfPreventionFilter) {
|
|
||||||
this.proxyHost = proxyHost;
|
|
||||||
this.conf = conf;
|
|
||||||
this.confForCreate = confForCreate;
|
|
||||||
this.restCsrfPreventionFilter = restCsrfPreventionFilter;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void configureHttp1(ChannelHandlerContext ctx) {
|
|
||||||
ctx.pipeline().addLast(new HttpServerCodec());
|
|
||||||
if (this.restCsrfPreventionFilter != null) {
|
|
||||||
ctx.pipeline().addLast(new RestCsrfPreventionFilterHandler(
|
|
||||||
this.restCsrfPreventionFilter));
|
|
||||||
}
|
|
||||||
ctx.pipeline().addLast(new ChunkedWriteHandler(),
|
|
||||||
new URLDispatcher(proxyHost, conf, confForCreate));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void configureHttp2(ChannelHandlerContext ctx) {
|
|
||||||
if (this.restCsrfPreventionFilter != null) {
|
|
||||||
ctx.pipeline().addLast(new RestCsrfPreventionFilterHandler(
|
|
||||||
this.restCsrfPreventionFilter));
|
|
||||||
}
|
|
||||||
ctx.pipeline().addLast(new DtpHttp2Handler());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
|
|
||||||
List<Object> out) throws Exception {
|
|
||||||
if (in.readableBytes() < MAGIC_HEADER_LENGTH) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (ByteBufUtil.equals(in, 0, HTTP2_CLIENT_CONNECTION_PREFACE, 0,
|
|
||||||
MAGIC_HEADER_LENGTH)) {
|
|
||||||
configureHttp2(ctx);
|
|
||||||
} else {
|
|
||||||
configureHttp1(ctx);
|
|
||||||
}
|
|
||||||
ctx.pipeline().remove(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -17,9 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode.web;
|
package org.apache.hadoop.hdfs.server.datanode.web;
|
||||||
|
|
||||||
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
|
|
||||||
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
|
|
||||||
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
|
||||||
import io.netty.bootstrap.Bootstrap;
|
import io.netty.bootstrap.Bootstrap;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
|
@ -34,14 +31,17 @@ import io.netty.channel.socket.SocketChannel;
|
||||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||||
import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
||||||
import io.netty.handler.codec.http.DefaultHttpResponse;
|
import io.netty.handler.codec.http.DefaultHttpResponse;
|
||||||
import io.netty.handler.codec.http.HttpHeaderValues;
|
|
||||||
import io.netty.handler.codec.http.HttpRequest;
|
import io.netty.handler.codec.http.HttpRequest;
|
||||||
import io.netty.handler.codec.http.HttpRequestEncoder;
|
import io.netty.handler.codec.http.HttpRequestEncoder;
|
||||||
import io.netty.handler.codec.http.HttpResponseEncoder;
|
import io.netty.handler.codec.http.HttpResponseEncoder;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Values;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
|
||||||
|
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Dead simple session-layer HTTP proxy. It gets the HTTP responses
|
* Dead simple session-layer HTTP proxy. It gets the HTTP responses
|
||||||
|
@ -98,7 +98,7 @@ class SimpleHttpProxyHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
||||||
@Override
|
@Override
|
||||||
public void channelRead0
|
public void channelRead0
|
||||||
(final ChannelHandlerContext ctx, final HttpRequest req) {
|
(final ChannelHandlerContext ctx, final HttpRequest req) {
|
||||||
uri = req.uri();
|
uri = req.getUri();
|
||||||
final Channel client = ctx.channel();
|
final Channel client = ctx.channel();
|
||||||
Bootstrap proxiedServer = new Bootstrap()
|
Bootstrap proxiedServer = new Bootstrap()
|
||||||
.group(client.eventLoop())
|
.group(client.eventLoop())
|
||||||
|
@ -118,14 +118,14 @@ class SimpleHttpProxyHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
||||||
if (future.isSuccess()) {
|
if (future.isSuccess()) {
|
||||||
ctx.channel().pipeline().remove(HttpResponseEncoder.class);
|
ctx.channel().pipeline().remove(HttpResponseEncoder.class);
|
||||||
HttpRequest newReq = new DefaultFullHttpRequest(HTTP_1_1,
|
HttpRequest newReq = new DefaultFullHttpRequest(HTTP_1_1,
|
||||||
req.method(), req.uri());
|
req.getMethod(), req.getUri());
|
||||||
newReq.headers().add(req.headers());
|
newReq.headers().add(req.headers());
|
||||||
newReq.headers().set(CONNECTION, HttpHeaderValues.CLOSE);
|
newReq.headers().set(CONNECTION, Values.CLOSE);
|
||||||
future.channel().writeAndFlush(newReq);
|
future.channel().writeAndFlush(newReq);
|
||||||
} else {
|
} else {
|
||||||
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1,
|
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1,
|
||||||
INTERNAL_SERVER_ERROR);
|
INTERNAL_SERVER_ERROR);
|
||||||
resp.headers().set(CONNECTION, HttpHeaderValues.CLOSE);
|
resp.headers().set(CONNECTION, Values.CLOSE);
|
||||||
LOG.info("Proxy " + uri + " failed. Cause: ", future.cause());
|
LOG.info("Proxy " + uri + " failed. Cause: ", future.cause());
|
||||||
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
|
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
|
||||||
client.close();
|
client.close();
|
||||||
|
|
|
@ -17,16 +17,16 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode.web;
|
package org.apache.hadoop.hdfs.server.datanode.web;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX;
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
import io.netty.channel.SimpleChannelInboundHandler;
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
import io.netty.handler.codec.http.HttpRequest;
|
import io.netty.handler.codec.http.HttpRequest;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler;
|
|
||||||
|
|
||||||
class URLDispatcher extends SimpleChannelInboundHandler<HttpRequest> {
|
class URLDispatcher extends SimpleChannelInboundHandler<HttpRequest> {
|
||||||
private final InetSocketAddress proxyHost;
|
private final InetSocketAddress proxyHost;
|
||||||
|
@ -42,8 +42,8 @@ class URLDispatcher extends SimpleChannelInboundHandler<HttpRequest> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req)
|
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
String uri = req.uri();
|
String uri = req.getUri();
|
||||||
ChannelPipeline p = ctx.pipeline();
|
ChannelPipeline p = ctx.pipeline();
|
||||||
if (uri.startsWith(WEBHDFS_PREFIX)) {
|
if (uri.startsWith(WEBHDFS_PREFIX)) {
|
||||||
WebHdfsHandler h = new WebHdfsHandler(conf, confForCreate);
|
WebHdfsHandler h = new WebHdfsHandler(conf, confForCreate);
|
||||||
|
|
|
@ -1,52 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode.web.dtp;
|
|
||||||
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
|
||||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
|
||||||
import io.netty.handler.codec.http2.DefaultHttp2Headers;
|
|
||||||
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
|
|
||||||
import io.netty.handler.codec.http2.Http2Exception;
|
|
||||||
import io.netty.handler.codec.http2.Http2FrameAdapter;
|
|
||||||
import io.netty.handler.codec.http2.Http2Headers;
|
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
|
|
||||||
class DtpHttp2FrameListener extends Http2FrameAdapter {
|
|
||||||
|
|
||||||
private Http2ConnectionEncoder encoder;
|
|
||||||
|
|
||||||
public void encoder(Http2ConnectionEncoder encoder) {
|
|
||||||
this.encoder = encoder;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
|
|
||||||
Http2Headers headers, int streamDependency, short weight,
|
|
||||||
boolean exclusive, int padding, boolean endStream) throws Http2Exception {
|
|
||||||
encoder.writeHeaders(ctx, streamId,
|
|
||||||
new DefaultHttp2Headers().status(HttpResponseStatus.OK.codeAsText()), 0,
|
|
||||||
false, ctx.newPromise());
|
|
||||||
encoder.writeData(
|
|
||||||
ctx,
|
|
||||||
streamId,
|
|
||||||
ctx.alloc().buffer()
|
|
||||||
.writeBytes("HTTP/2 DTP".getBytes(StandardCharsets.UTF_8)), 0, true,
|
|
||||||
ctx.newPromise());
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,34 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode.web.dtp;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
||||||
|
|
||||||
import io.netty.handler.codec.http2.Http2ConnectionHandler;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The HTTP/2 handler.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class DtpHttp2Handler extends Http2ConnectionHandler {
|
|
||||||
|
|
||||||
public DtpHttp2Handler() {
|
|
||||||
super(true, new DtpHttp2FrameListener());
|
|
||||||
((DtpHttp2FrameListener) decoder().listener()).encoder(encoder());
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -17,21 +17,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
|
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
|
||||||
|
|
||||||
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
|
import com.google.common.base.Charsets;
|
||||||
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
|
import com.sun.jersey.api.ParamException;
|
||||||
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
|
import com.sun.jersey.api.container.ContainerException;
|
||||||
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
|
|
||||||
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
|
|
||||||
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
|
|
||||||
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
|
||||||
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.APPLICATION_JSON_UTF8;
|
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
||||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.hadoop.hdfs.web.JsonUtil;
|
import org.apache.hadoop.hdfs.web.JsonUtil;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
@ -39,9 +30,17 @@ import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||||
import org.apache.hadoop.security.token.SecretManager;
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
|
|
||||||
import com.google.common.base.Charsets;
|
import java.io.FileNotFoundException;
|
||||||
import com.sun.jersey.api.ParamException;
|
import java.io.IOException;
|
||||||
import com.sun.jersey.api.container.ContainerException;
|
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
|
||||||
|
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||||
|
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.APPLICATION_JSON_UTF8;
|
||||||
|
|
||||||
class ExceptionHandler {
|
class ExceptionHandler {
|
||||||
static Log LOG = WebHdfsHandler.LOG;
|
static Log LOG = WebHdfsHandler.LOG;
|
||||||
|
|
|
@ -17,21 +17,21 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
|
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
|
||||||
|
|
||||||
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
|
|
||||||
import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE;
|
|
||||||
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelFutureListener;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.SimpleChannelInboundHandler;
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
import io.netty.handler.codec.http.DefaultHttpResponse;
|
import io.netty.handler.codec.http.DefaultHttpResponse;
|
||||||
import io.netty.handler.codec.http.HttpContent;
|
import io.netty.handler.codec.http.HttpContent;
|
||||||
import io.netty.handler.codec.http.LastHttpContent;
|
import io.netty.handler.codec.http.LastHttpContent;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
|
||||||
|
|
||||||
class HdfsWriter extends SimpleChannelInboundHandler<HttpContent> {
|
class HdfsWriter extends SimpleChannelInboundHandler<HttpContent> {
|
||||||
private final DFSClient client;
|
private final DFSClient client;
|
||||||
|
|
|
@ -17,28 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
|
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
|
||||||
|
|
||||||
import static io.netty.handler.codec.http.HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS;
|
import com.google.common.base.Preconditions;
|
||||||
import static io.netty.handler.codec.http.HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN;
|
|
||||||
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
|
|
||||||
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
|
|
||||||
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
|
|
||||||
import static io.netty.handler.codec.http.HttpHeaderNames.LOCATION;
|
|
||||||
import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE;
|
|
||||||
import static io.netty.handler.codec.http.HttpHeaderNames.ACCEPT;
|
|
||||||
import static io.netty.handler.codec.http.HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS;
|
|
||||||
import static io.netty.handler.codec.http.HttpHeaderNames.ACCESS_CONTROL_MAX_AGE;
|
|
||||||
import static io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE;
|
|
||||||
import static io.netty.handler.codec.http.HttpMethod.GET;
|
|
||||||
import static io.netty.handler.codec.http.HttpMethod.OPTIONS;
|
|
||||||
import static io.netty.handler.codec.http.HttpMethod.POST;
|
|
||||||
import static io.netty.handler.codec.http.HttpMethod.PUT;
|
|
||||||
import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE;
|
|
||||||
import static io.netty.handler.codec.http.HttpResponseStatus.CREATED;
|
|
||||||
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
|
|
||||||
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
|
|
||||||
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
|
||||||
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HDFS_URI_SCHEME;
|
|
||||||
import static org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier.HDFS_DELEGATION_KIND;
|
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelFutureListener;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
@ -80,7 +59,28 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.LimitInputStream;
|
import org.apache.hadoop.util.LimitInputStream;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import static io.netty.handler.codec.http.HttpHeaders.Names.ACCESS_CONTROL_ALLOW_METHODS;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Names.LOCATION;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaderNames.ACCEPT;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaderNames.ACCESS_CONTROL_MAX_AGE;
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE;
|
||||||
|
import static io.netty.handler.codec.http.HttpMethod.GET;
|
||||||
|
import static io.netty.handler.codec.http.HttpMethod.OPTIONS;
|
||||||
|
import static io.netty.handler.codec.http.HttpMethod.POST;
|
||||||
|
import static io.netty.handler.codec.http.HttpMethod.PUT;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.CREATED;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
|
||||||
|
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||||
|
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HDFS_URI_SCHEME;
|
||||||
|
import static org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier.HDFS_DELEGATION_KIND;
|
||||||
|
|
||||||
public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
||||||
static final Log LOG = LogFactory.getLog(WebHdfsHandler.class);
|
static final Log LOG = LogFactory.getLog(WebHdfsHandler.class);
|
||||||
|
@ -112,8 +112,8 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
||||||
@Override
|
@Override
|
||||||
public void channelRead0(final ChannelHandlerContext ctx,
|
public void channelRead0(final ChannelHandlerContext ctx,
|
||||||
final HttpRequest req) throws Exception {
|
final HttpRequest req) throws Exception {
|
||||||
Preconditions.checkArgument(req.uri().startsWith(WEBHDFS_PREFIX));
|
Preconditions.checkArgument(req.getUri().startsWith(WEBHDFS_PREFIX));
|
||||||
QueryStringDecoder queryString = new QueryStringDecoder(req.uri());
|
QueryStringDecoder queryString = new QueryStringDecoder(req.getUri());
|
||||||
params = new ParameterParser(queryString, conf);
|
params = new ParameterParser(queryString, conf);
|
||||||
DataNodeUGIProvider ugiProvider = new DataNodeUGIProvider(params);
|
DataNodeUGIProvider ugiProvider = new DataNodeUGIProvider(params);
|
||||||
ugi = ugiProvider.ugi();
|
ugi = ugiProvider.ugi();
|
||||||
|
@ -150,7 +150,7 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
||||||
public void handle(ChannelHandlerContext ctx, HttpRequest req)
|
public void handle(ChannelHandlerContext ctx, HttpRequest req)
|
||||||
throws IOException, URISyntaxException {
|
throws IOException, URISyntaxException {
|
||||||
String op = params.op();
|
String op = params.op();
|
||||||
HttpMethod method = req.method();
|
HttpMethod method = req.getMethod();
|
||||||
if (PutOpParam.Op.CREATE.name().equalsIgnoreCase(op)
|
if (PutOpParam.Op.CREATE.name().equalsIgnoreCase(op)
|
||||||
&& method == PUT) {
|
&& method == PUT) {
|
||||||
onCreate(ctx);
|
onCreate(ctx);
|
||||||
|
|
|
@ -17,19 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.tools.offlineImageViewer;
|
package org.apache.hadoop.hdfs.tools.offlineImageViewer;
|
||||||
|
|
||||||
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
|
import com.google.common.base.Charsets;
|
||||||
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
|
|
||||||
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
|
|
||||||
import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE;
|
|
||||||
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
|
|
||||||
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
|
|
||||||
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
|
|
||||||
import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
|
|
||||||
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
|
|
||||||
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
|
||||||
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.APPLICATION_JSON_UTF8;
|
|
||||||
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX;
|
|
||||||
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX_LENGTH;
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelFutureListener;
|
||||||
|
@ -42,18 +30,29 @@ import io.netty.handler.codec.http.HttpMethod;
|
||||||
import io.netty.handler.codec.http.HttpRequest;
|
import io.netty.handler.codec.http.HttpRequest;
|
||||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||||
import io.netty.handler.codec.http.QueryStringDecoder;
|
import io.netty.handler.codec.http.QueryStringDecoder;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hdfs.web.JsonUtil;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
|
||||||
import org.apache.hadoop.hdfs.web.JsonUtil;
|
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
|
||||||
import com.google.common.base.Charsets;
|
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
|
||||||
|
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||||
|
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.APPLICATION_JSON_UTF8;
|
||||||
|
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX;
|
||||||
|
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX_LENGTH;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implement the read-only WebHDFS API for fsimage.
|
* Implement the read-only WebHDFS API for fsimage.
|
||||||
|
@ -76,7 +75,7 @@ class FSImageHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
||||||
@Override
|
@Override
|
||||||
public void channelRead0(ChannelHandlerContext ctx, HttpRequest request)
|
public void channelRead0(ChannelHandlerContext ctx, HttpRequest request)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
if (request.method() != HttpMethod.GET) {
|
if (request.getMethod() != HttpMethod.GET) {
|
||||||
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1,
|
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1,
|
||||||
METHOD_NOT_ALLOWED);
|
METHOD_NOT_ALLOWED);
|
||||||
resp.headers().set(CONNECTION, CLOSE);
|
resp.headers().set(CONNECTION, CLOSE);
|
||||||
|
@ -84,7 +83,7 @@ class FSImageHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
QueryStringDecoder decoder = new QueryStringDecoder(request.uri());
|
QueryStringDecoder decoder = new QueryStringDecoder(request.getUri());
|
||||||
final String op = getOp(decoder);
|
final String op = getOp(decoder);
|
||||||
|
|
||||||
final String content;
|
final String content;
|
||||||
|
|
|
@ -1,65 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode.web.dtp;
|
|
||||||
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
|
||||||
import io.netty.channel.SimpleChannelInboundHandler;
|
|
||||||
import io.netty.handler.codec.http.FullHttpResponse;
|
|
||||||
import io.netty.handler.codec.http2.HttpUtil;
|
|
||||||
import io.netty.util.concurrent.Promise;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
public class Http2ResponseHandler extends
|
|
||||||
SimpleChannelInboundHandler<FullHttpResponse> {
|
|
||||||
|
|
||||||
private Map<Integer, Promise<FullHttpResponse>> streamId2Promise =
|
|
||||||
new HashMap<>();
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg)
|
|
||||||
throws Exception {
|
|
||||||
Integer streamId =
|
|
||||||
msg.headers().getInt(HttpUtil.ExtensionHeaderNames.STREAM_ID.text());
|
|
||||||
if (streamId == null) {
|
|
||||||
System.err.println("HttpResponseHandler unexpected message received: "
|
|
||||||
+ msg);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (streamId.intValue() == 1) {
|
|
||||||
// this is the upgrade response message, just ignore it.
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Promise<FullHttpResponse> promise;
|
|
||||||
synchronized (this) {
|
|
||||||
promise = streamId2Promise.get(streamId);
|
|
||||||
}
|
|
||||||
if (promise == null) {
|
|
||||||
System.err.println("Message received for unknown stream id " + streamId);
|
|
||||||
} else {
|
|
||||||
// Do stuff with the message (for now just print it)
|
|
||||||
promise.setSuccess(msg.retain());
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void put(Integer streamId, Promise<FullHttpResponse> promise) {
|
|
||||||
streamId2Promise.put(streamId, promise);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,147 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode.web.dtp;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import io.netty.bootstrap.Bootstrap;
|
|
||||||
import io.netty.buffer.ByteBuf;
|
|
||||||
import io.netty.channel.Channel;
|
|
||||||
import io.netty.channel.ChannelInitializer;
|
|
||||||
import io.netty.channel.EventLoopGroup;
|
|
||||||
import io.netty.channel.nio.NioEventLoopGroup;
|
|
||||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
|
||||||
import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
|
||||||
import io.netty.handler.codec.http.FullHttpRequest;
|
|
||||||
import io.netty.handler.codec.http.FullHttpResponse;
|
|
||||||
import io.netty.handler.codec.http.HttpMethod;
|
|
||||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
|
||||||
import io.netty.handler.codec.http.HttpVersion;
|
|
||||||
import io.netty.handler.codec.http2.DefaultHttp2Connection;
|
|
||||||
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
|
|
||||||
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
|
|
||||||
import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener;
|
|
||||||
import io.netty.handler.codec.http2.Http2Connection;
|
|
||||||
import io.netty.handler.codec.http2.Http2ConnectionHandler;
|
|
||||||
import io.netty.handler.codec.http2.Http2FrameLogger;
|
|
||||||
import io.netty.handler.codec.http2.Http2FrameReader;
|
|
||||||
import io.netty.handler.codec.http2.Http2FrameWriter;
|
|
||||||
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
|
|
||||||
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
|
|
||||||
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler;
|
|
||||||
import io.netty.handler.codec.http2.HttpUtil;
|
|
||||||
import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapter;
|
|
||||||
import io.netty.handler.logging.LogLevel;
|
|
||||||
import io.netty.handler.timeout.TimeoutException;
|
|
||||||
import io.netty.util.concurrent.Promise;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
||||||
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
public class TestDtpHttp2 {
|
|
||||||
|
|
||||||
private static final Http2FrameLogger FRAME_LOGGER = new Http2FrameLogger(
|
|
||||||
LogLevel.INFO, TestDtpHttp2.class);
|
|
||||||
|
|
||||||
private static final Configuration CONF = WebHdfsTestUtil.createConf();
|
|
||||||
|
|
||||||
private static MiniDFSCluster CLUSTER;
|
|
||||||
|
|
||||||
private static final EventLoopGroup WORKER_GROUP = new NioEventLoopGroup();
|
|
||||||
|
|
||||||
private static Channel CHANNEL;
|
|
||||||
|
|
||||||
private static Http2ResponseHandler RESPONSE_HANDLER;
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setUp() throws IOException, URISyntaxException,
|
|
||||||
TimeoutException {
|
|
||||||
CLUSTER = new MiniDFSCluster.Builder(CONF).numDataNodes(1).build();
|
|
||||||
CLUSTER.waitActive();
|
|
||||||
|
|
||||||
RESPONSE_HANDLER = new Http2ResponseHandler();
|
|
||||||
Bootstrap bootstrap =
|
|
||||||
new Bootstrap()
|
|
||||||
.group(WORKER_GROUP)
|
|
||||||
.channel(NioSocketChannel.class)
|
|
||||||
.remoteAddress("127.0.0.1",
|
|
||||||
CLUSTER.getDataNodes().get(0).getInfoPort())
|
|
||||||
.handler(new ChannelInitializer<Channel>() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void initChannel(Channel ch) throws Exception {
|
|
||||||
Http2Connection connection = new DefaultHttp2Connection(false);
|
|
||||||
Http2ConnectionHandler connectionHandler =
|
|
||||||
new HttpToHttp2ConnectionHandler(connection, frameReader(),
|
|
||||||
frameWriter(), new DelegatingDecompressorFrameListener(
|
|
||||||
connection, new InboundHttp2ToHttpAdapter.Builder(
|
|
||||||
connection).maxContentLength(Integer.MAX_VALUE)
|
|
||||||
.propagateSettings(true).build()));
|
|
||||||
ch.pipeline().addLast(connectionHandler, RESPONSE_HANDLER);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
CHANNEL = bootstrap.connect().syncUninterruptibly().channel();
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void tearDown() throws IOException {
|
|
||||||
if (CHANNEL != null) {
|
|
||||||
CHANNEL.close().syncUninterruptibly();
|
|
||||||
}
|
|
||||||
WORKER_GROUP.shutdownGracefully();
|
|
||||||
if (CLUSTER != null) {
|
|
||||||
CLUSTER.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Http2FrameReader frameReader() {
|
|
||||||
return new Http2InboundFrameLogger(new DefaultHttp2FrameReader(),
|
|
||||||
FRAME_LOGGER);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Http2FrameWriter frameWriter() {
|
|
||||||
return new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(),
|
|
||||||
FRAME_LOGGER);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void test() throws InterruptedException, ExecutionException {
|
|
||||||
int streamId = 3;
|
|
||||||
FullHttpRequest request =
|
|
||||||
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
|
|
||||||
request.headers().add(HttpUtil.ExtensionHeaderNames.STREAM_ID.text(),
|
|
||||||
streamId);
|
|
||||||
Promise<FullHttpResponse> promise = CHANNEL.eventLoop().newPromise();
|
|
||||||
synchronized (RESPONSE_HANDLER) {
|
|
||||||
CHANNEL.writeAndFlush(request);
|
|
||||||
RESPONSE_HANDLER.put(streamId, promise);
|
|
||||||
}
|
|
||||||
assertEquals(HttpResponseStatus.OK, promise.get().status());
|
|
||||||
ByteBuf content = promise.get().content();
|
|
||||||
assertEquals("HTTP/2 DTP", content.toString(StandardCharsets.UTF_8));
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -615,12 +615,6 @@
|
||||||
<version>4.1.0.Beta5</version>
|
<version>4.1.0.Beta5</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.twitter</groupId>
|
|
||||||
<artifactId>hpack</artifactId>
|
|
||||||
<version>0.11.0</version>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>commons-io</groupId>
|
<groupId>commons-io</groupId>
|
||||||
<artifactId>commons-io</artifactId>
|
<artifactId>commons-io</artifactId>
|
||||||
|
|
Loading…
Reference in New Issue