BAEL-3895: HTTP/2 in Netty (#9036)

* BAEL-3895 : HTTP/2 in Netty

* BAEL-3895: Added Live Test in place of java client

* BAEL-3895: Commented out netty module from main pom as it needs Java 13
This commit is contained in:
Sampada 2020-04-08 02:26:53 +05:30 committed by GitHub
parent 5231e082d8
commit 8e107d1180
11 changed files with 596 additions and 0 deletions

6
netty/README.md Normal file
View File

@ -0,0 +1,6 @@
## Netty
This module contains articles about Netty.
### Relevant Articles:

34
netty/pom.xml Normal file
View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>netty</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>netty</name>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>org.conscrypt</groupId>
<artifactId>conscrypt-openjdk-uber</artifactId>
<version>2.4.0</version>
</dependency>
</dependencies>
<properties>
<netty.version>4.1.48.Final</netty.version>
</properties>
</project>

View File

@ -0,0 +1,135 @@
package com.baeldung.netty.http2;
import static io.netty.handler.logging.LogLevel.INFO;
import java.security.cert.CertificateException;
import javax.net.ssl.SSLException;
import com.baeldung.netty.http2.client.Http2ClientResponseHandler;
import com.baeldung.netty.http2.client.Http2SettingsHandler;
import com.baeldung.netty.http2.server.Http2ServerResponseHandler;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpScheme;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2SecurityUtil;
import io.netty.handler.codec.http2.HttpConversionUtil;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandlerBuilder;
import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapterBuilder;
import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.ApplicationProtocolConfig.Protocol;
import io.netty.handler.ssl.ApplicationProtocolConfig.SelectedListenerFailureBehavior;
import io.netty.handler.ssl.ApplicationProtocolConfig.SelectorFailureBehavior;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.SupportedCipherSuiteFilter;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
public class Http2Util {
public static SslContext createSSLContext(boolean isServer) throws SSLException, CertificateException {
SslContext sslCtx;
SelfSignedCertificate ssc = new SelfSignedCertificate();
if (isServer) {
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
.sslProvider(SslProvider.JDK)
.ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
.applicationProtocolConfig(new ApplicationProtocolConfig(Protocol.ALPN,
SelectorFailureBehavior.NO_ADVERTISE,
SelectedListenerFailureBehavior.ACCEPT, ApplicationProtocolNames.HTTP_2, ApplicationProtocolNames.HTTP_1_1))
.build();
} else {
sslCtx = SslContextBuilder.forClient()
.sslProvider(SslProvider.JDK)
.ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.applicationProtocolConfig(new ApplicationProtocolConfig(Protocol.ALPN,
SelectorFailureBehavior.NO_ADVERTISE,
SelectedListenerFailureBehavior.ACCEPT, ApplicationProtocolNames.HTTP_2))
.build();
}
return sslCtx;
}
public static ApplicationProtocolNegotiationHandler getServerAPNHandler() {
ApplicationProtocolNegotiationHandler serverAPNHandler = new ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_2) {
@Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception {
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
ctx.pipeline()
.addLast(Http2FrameCodecBuilder.forServer()
.build(), new Http2ServerResponseHandler());
return;
}
throw new IllegalStateException("Protocol: " + protocol + " not supported");
}
};
return serverAPNHandler;
}
public static ApplicationProtocolNegotiationHandler getClientAPNHandler(int maxContentLength, Http2SettingsHandler settingsHandler, Http2ClientResponseHandler responseHandler) {
final Http2FrameLogger logger = new Http2FrameLogger(INFO, Http2Util.class);
final Http2Connection connection = new DefaultHttp2Connection(false);
HttpToHttp2ConnectionHandler connectionHandler = new HttpToHttp2ConnectionHandlerBuilder()
.frameListener(new DelegatingDecompressorFrameListener(connection, new InboundHttp2ToHttpAdapterBuilder(connection).maxContentLength(maxContentLength)
.propagateSettings(true)
.build()))
.frameLogger(logger)
.connection(connection)
.build();
ApplicationProtocolNegotiationHandler clientAPNHandler = new ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_2) {
@Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
ChannelPipeline p = ctx.pipeline();
p.addLast(connectionHandler);
p.addLast(settingsHandler, responseHandler);
return;
}
ctx.close();
throw new IllegalStateException("Protocol: " + protocol + " not supported");
}
};
return clientAPNHandler;
}
public static FullHttpRequest createGetRequest(String host, int port) {
FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.valueOf("HTTP/2.0"), HttpMethod.GET, "/", Unpooled.EMPTY_BUFFER);
request.headers()
.add(HttpHeaderNames.HOST, new String(host + ":" + port));
request.headers()
.add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), HttpScheme.HTTPS);
request.headers()
.add(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP);
request.headers()
.add(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.DEFLATE);
return request;
}
}

View File

@ -0,0 +1,46 @@
package com.baeldung.netty.http2.client;
import com.baeldung.netty.http2.Http2Util;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
private final int maxContentLength;
private Http2SettingsHandler settingsHandler;
private Http2ClientResponseHandler responseHandler;
private String host;
private int port;
public Http2ClientInitializer(SslContext sslCtx, int maxContentLength, String host, int port) {
this.sslCtx = sslCtx;
this.maxContentLength = maxContentLength;
this.host = host;
this.port = port;
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
settingsHandler = new Http2SettingsHandler(ch.newPromise());
responseHandler = new Http2ClientResponseHandler();
if (sslCtx != null) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(sslCtx.newHandler(ch.alloc(), host, port));
pipeline.addLast(Http2Util.getClientAPNHandler(maxContentLength, settingsHandler, responseHandler));
}
}
public Http2SettingsHandler getSettingsHandler() {
return settingsHandler;
}
public Http2ClientResponseHandler getResponseHandler() {
return responseHandler;
}
}

View File

@ -0,0 +1,128 @@
package com.baeldung.netty.http2.client;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http2.HttpConversionUtil;
import io.netty.util.CharsetUtil;
public class Http2ClientResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
private final Logger logger = LoggerFactory.getLogger(Http2ClientResponseHandler.class);
private final Map<Integer, MapValues> streamidMap;
public Http2ClientResponseHandler() {
streamidMap = new HashMap<Integer, MapValues>();
}
public MapValues put(int streamId, ChannelFuture writeFuture, ChannelPromise promise) {
return streamidMap.put(streamId, new MapValues(writeFuture, promise));
}
public String awaitResponses(long timeout, TimeUnit unit) {
Iterator<Entry<Integer, MapValues>> itr = streamidMap.entrySet()
.iterator();
String response = null;
while (itr.hasNext()) {
Entry<Integer, MapValues> entry = itr.next();
ChannelFuture writeFuture = entry.getValue()
.getWriteFuture();
if (!writeFuture.awaitUninterruptibly(timeout, unit)) {
throw new IllegalStateException("Timed out waiting to write for stream id " + entry.getKey());
}
if (!writeFuture.isSuccess()) {
throw new RuntimeException(writeFuture.cause());
}
ChannelPromise promise = entry.getValue()
.getPromise();
if (!promise.awaitUninterruptibly(timeout, unit)) {
throw new IllegalStateException("Timed out waiting for response on stream id " + entry.getKey());
}
if (!promise.isSuccess()) {
throw new RuntimeException(promise.cause());
}
logger.info("---Stream id: " + entry.getKey() + " received---");
response = entry.getValue().getResponse();
itr.remove();
}
return response;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
Integer streamId = msg.headers()
.getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text());
if (streamId == null) {
logger.error("HttpResponseHandler unexpected message received: " + msg);
return;
}
MapValues value = streamidMap.get(streamId);
if (value == null) {
logger.error("Message received for unknown stream id " + streamId);
ctx.close();
} else {
ByteBuf content = msg.content();
if (content.isReadable()) {
int contentLength = content.readableBytes();
byte[] arr = new byte[contentLength];
content.readBytes(arr);
String response = new String(arr, 0, contentLength, CharsetUtil.UTF_8);
logger.info("Response from Server: "+ (response));
value.setResponse(response);
}
value.getPromise()
.setSuccess();
}
}
public static class MapValues {
ChannelFuture writeFuture;
ChannelPromise promise;
String response;
public String getResponse() {
return response;
}
public void setResponse(String response) {
this.response = response;
}
public MapValues(ChannelFuture writeFuture2, ChannelPromise promise2) {
this.writeFuture = writeFuture2;
this.promise = promise2;
}
public ChannelFuture getWriteFuture() {
return writeFuture;
}
public ChannelPromise getPromise() {
return promise;
}
}
}

View File

@ -0,0 +1,30 @@
package com.baeldung.netty.http2.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http2.Http2Settings;
import java.util.concurrent.TimeUnit;
public class Http2SettingsHandler extends SimpleChannelInboundHandler<Http2Settings> {
private final ChannelPromise promise;
public Http2SettingsHandler(ChannelPromise promise) {
this.promise = promise;
}
public void awaitSettings(long timeout, TimeUnit unit) throws Exception {
if (!promise.awaitUninterruptibly(timeout, unit)) {
throw new IllegalStateException("Timed out waiting for settings");
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2Settings msg) throws Exception {
promise.setSuccess();
ctx.pipeline()
.remove(this);
}
}

View File

@ -0,0 +1,59 @@
package com.baeldung.netty.http2.server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.baeldung.netty.http2.Http2Util;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
public final class Http2Server {
private static final int PORT = 8443;
private static final Logger logger = LoggerFactory.getLogger(Http2Server.class);
public static void main(String[] args) throws Exception {
SslContext sslCtx = Http2Util.createSSLContext(true);
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_BACKLOG, 1024);
b.group(group)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
if (sslCtx != null) {
ch.pipeline()
.addLast(sslCtx.newHandler(ch.alloc()), Http2Util.getServerAPNHandler());
}
}
});
Channel ch = b.bind(PORT)
.sync()
.channel();
logger.info("HTTP/2 Server is listening on https://127.0.0.1:" + PORT + '/');
ch.closeFuture()
.sync();
} finally {
group.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,52 @@
package com.baeldung.netty.http2.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2HeadersFrame;
import io.netty.util.CharsetUtil;
@Sharable
public class Http2ServerResponseHandler extends ChannelDuplexHandler {
static final ByteBuf RESPONSE_BYTES = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8));
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Http2HeadersFrame) {
Http2HeadersFrame msgHeader = (Http2HeadersFrame) msg;
if (msgHeader.isEndStream()) {
ByteBuf content = ctx.alloc()
.buffer();
content.writeBytes(RESPONSE_BYTES.duplicate());
Http2Headers headers = new DefaultHttp2Headers().status(HttpResponseStatus.OK.codeAsText());
ctx.write(new DefaultHttp2HeadersFrame(headers).stream(msgHeader.stream()));
ctx.write(new DefaultHttp2DataFrame(content, true).stream(msgHeader.stream()));
}
} else {
super.channelRead(ctx, msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}

View File

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@ -0,0 +1,91 @@
package com.baeldung.netty;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.baeldung.netty.http2.Http2Util;
import com.baeldung.netty.http2.client.Http2ClientInitializer;
import com.baeldung.netty.http2.client.Http2ClientResponseHandler;
import com.baeldung.netty.http2.client.Http2SettingsHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.ssl.SslContext;
//Ensure the server class - Http2Server.java is already started before running this test
public class Http2ClientLiveTest {
private static final Logger logger = LoggerFactory.getLogger(Http2ClientLiveTest.class);
private static final String HOST = "127.0.0.1";
private static final int PORT = 8443;
private SslContext sslCtx;
private Channel channel;
@Before
public void setup() throws Exception {
sslCtx = Http2Util.createSSLContext(false);
}
@Test
public void whenRequestSent_thenHelloWorldReceived() throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
Http2ClientInitializer initializer = new Http2ClientInitializer(sslCtx, Integer.MAX_VALUE, HOST, PORT);
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.remoteAddress(HOST, PORT);
b.handler(initializer);
channel = b.connect()
.syncUninterruptibly()
.channel();
logger.info("Connected to [" + HOST + ':' + PORT + ']');
Http2SettingsHandler http2SettingsHandler = initializer.getSettingsHandler();
http2SettingsHandler.awaitSettings(60, TimeUnit.SECONDS);
logger.info("Sending request(s)...");
FullHttpRequest request = Http2Util.createGetRequest(HOST, PORT);
Http2ClientResponseHandler responseHandler = initializer.getResponseHandler();
int streamId = 3;
responseHandler.put(streamId, channel.write(request), channel.newPromise());
channel.flush();
String response = responseHandler.awaitResponses(60, TimeUnit.SECONDS);
assertEquals("Hello World", response);
logger.info("Finished HTTP/2 request(s)");
} finally {
workerGroup.shutdownGracefully();
}
}
@After
public void cleanup() {
channel.close()
.syncUninterruptibly();
}
}

View File

@ -536,6 +536,7 @@
<module>mybatis</module>
<module>netflix-modules</module>
<!-- <module>netty</module> --> <!-- we haven't upgraded to Java 13 -->
<module>ninja</module>
<module>open-liberty</module>
@ -1047,6 +1048,7 @@
<module>mybatis</module>
<module>netflix-modules</module>
<!-- <module>netty</module> --> <!-- we haven't upgraded to Java 13 -->
<module>ninja</module>
<module>open-liberty</module>