diff --git a/netty/README.md b/netty/README.md new file mode 100644 index 0000000000..b006c1c686 --- /dev/null +++ b/netty/README.md @@ -0,0 +1,6 @@ +## Netty + +This module contains articles about Netty. + +### Relevant Articles: + diff --git a/netty/pom.xml b/netty/pom.xml new file mode 100644 index 0000000000..1a33eef92e --- /dev/null +++ b/netty/pom.xml @@ -0,0 +1,34 @@ + + + 4.0.0 + netty + 0.0.1-SNAPSHOT + netty + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + + io.netty + netty-all + ${netty.version} + + + + org.conscrypt + conscrypt-openjdk-uber + 2.4.0 + + + + + + 4.1.48.Final + + + \ No newline at end of file diff --git a/netty/src/main/java/com/baeldung/netty/http2/Http2Util.java b/netty/src/main/java/com/baeldung/netty/http2/Http2Util.java new file mode 100644 index 0000000000..62b6d4c4ed --- /dev/null +++ b/netty/src/main/java/com/baeldung/netty/http2/Http2Util.java @@ -0,0 +1,135 @@ +package com.baeldung.netty.http2; + +import static io.netty.handler.logging.LogLevel.INFO; + +import java.security.cert.CertificateException; + +import javax.net.ssl.SSLException; + +import com.baeldung.netty.http2.client.Http2ClientResponseHandler; +import com.baeldung.netty.http2.client.Http2SettingsHandler; +import com.baeldung.netty.http2.server.Http2ServerResponseHandler; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpScheme; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http2.DefaultHttp2Connection; +import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener; +import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2FrameLogger; +import io.netty.handler.codec.http2.Http2SecurityUtil; +import io.netty.handler.codec.http2.HttpConversionUtil; +import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler; +import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandlerBuilder; +import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapterBuilder; +import io.netty.handler.ssl.ApplicationProtocolConfig; +import io.netty.handler.ssl.ApplicationProtocolConfig.Protocol; +import io.netty.handler.ssl.ApplicationProtocolConfig.SelectedListenerFailureBehavior; +import io.netty.handler.ssl.ApplicationProtocolConfig.SelectorFailureBehavior; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import io.netty.handler.ssl.SupportedCipherSuiteFilter; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.handler.ssl.util.SelfSignedCertificate; + +public class Http2Util { + public static SslContext createSSLContext(boolean isServer) throws SSLException, CertificateException { + + SslContext sslCtx; + + SelfSignedCertificate ssc = new SelfSignedCertificate(); + + if (isServer) { + sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()) + .sslProvider(SslProvider.JDK) + .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE) + .applicationProtocolConfig(new ApplicationProtocolConfig(Protocol.ALPN, + SelectorFailureBehavior.NO_ADVERTISE, + SelectedListenerFailureBehavior.ACCEPT, ApplicationProtocolNames.HTTP_2, ApplicationProtocolNames.HTTP_1_1)) + .build(); + } else { + sslCtx = SslContextBuilder.forClient() + .sslProvider(SslProvider.JDK) + .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE) + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .applicationProtocolConfig(new ApplicationProtocolConfig(Protocol.ALPN, + SelectorFailureBehavior.NO_ADVERTISE, + SelectedListenerFailureBehavior.ACCEPT, ApplicationProtocolNames.HTTP_2)) + .build(); + } + return sslCtx; + + } + + public static ApplicationProtocolNegotiationHandler getServerAPNHandler() { + ApplicationProtocolNegotiationHandler serverAPNHandler = new ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_2) { + + @Override + protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception { + if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { + ctx.pipeline() + .addLast(Http2FrameCodecBuilder.forServer() + .build(), new Http2ServerResponseHandler()); + return; + } + throw new IllegalStateException("Protocol: " + protocol + " not supported"); + } + }; + return serverAPNHandler; + + } + + public static ApplicationProtocolNegotiationHandler getClientAPNHandler(int maxContentLength, Http2SettingsHandler settingsHandler, Http2ClientResponseHandler responseHandler) { + final Http2FrameLogger logger = new Http2FrameLogger(INFO, Http2Util.class); + final Http2Connection connection = new DefaultHttp2Connection(false); + + HttpToHttp2ConnectionHandler connectionHandler = new HttpToHttp2ConnectionHandlerBuilder() + .frameListener(new DelegatingDecompressorFrameListener(connection, new InboundHttp2ToHttpAdapterBuilder(connection).maxContentLength(maxContentLength) + .propagateSettings(true) + .build())) + .frameLogger(logger) + .connection(connection) + .build(); + + ApplicationProtocolNegotiationHandler clientAPNHandler = new ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_2) { + @Override + protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { + if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { + ChannelPipeline p = ctx.pipeline(); + p.addLast(connectionHandler); + p.addLast(settingsHandler, responseHandler); + return; + } + ctx.close(); + throw new IllegalStateException("Protocol: " + protocol + " not supported"); + } + }; + + return clientAPNHandler; + + } + + public static FullHttpRequest createGetRequest(String host, int port) { + FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.valueOf("HTTP/2.0"), HttpMethod.GET, "/", Unpooled.EMPTY_BUFFER); + request.headers() + .add(HttpHeaderNames.HOST, new String(host + ":" + port)); + request.headers() + .add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), HttpScheme.HTTPS); + request.headers() + .add(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP); + request.headers() + .add(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.DEFLATE); + return request; + } +} diff --git a/netty/src/main/java/com/baeldung/netty/http2/client/Http2ClientInitializer.java b/netty/src/main/java/com/baeldung/netty/http2/client/Http2ClientInitializer.java new file mode 100644 index 0000000000..d50240fcb2 --- /dev/null +++ b/netty/src/main/java/com/baeldung/netty/http2/client/Http2ClientInitializer.java @@ -0,0 +1,46 @@ +package com.baeldung.netty.http2.client; + +import com.baeldung.netty.http2.Http2Util; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.ssl.SslContext; + +public class Http2ClientInitializer extends ChannelInitializer { + + private final SslContext sslCtx; + private final int maxContentLength; + private Http2SettingsHandler settingsHandler; + private Http2ClientResponseHandler responseHandler; + private String host; + private int port; + + public Http2ClientInitializer(SslContext sslCtx, int maxContentLength, String host, int port) { + this.sslCtx = sslCtx; + this.maxContentLength = maxContentLength; + this.host = host; + this.port = port; + } + + @Override + public void initChannel(SocketChannel ch) throws Exception { + + settingsHandler = new Http2SettingsHandler(ch.newPromise()); + responseHandler = new Http2ClientResponseHandler(); + + if (sslCtx != null) { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(sslCtx.newHandler(ch.alloc(), host, port)); + pipeline.addLast(Http2Util.getClientAPNHandler(maxContentLength, settingsHandler, responseHandler)); + } + } + + public Http2SettingsHandler getSettingsHandler() { + return settingsHandler; + } + + public Http2ClientResponseHandler getResponseHandler() { + return responseHandler; + } +} diff --git a/netty/src/main/java/com/baeldung/netty/http2/client/Http2ClientResponseHandler.java b/netty/src/main/java/com/baeldung/netty/http2/client/Http2ClientResponseHandler.java new file mode 100644 index 0000000000..4e17155bbc --- /dev/null +++ b/netty/src/main/java/com/baeldung/netty/http2/client/Http2ClientResponseHandler.java @@ -0,0 +1,128 @@ +package com.baeldung.netty.http2.client; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http2.HttpConversionUtil; +import io.netty.util.CharsetUtil; + +public class Http2ClientResponseHandler extends SimpleChannelInboundHandler { + + private final Logger logger = LoggerFactory.getLogger(Http2ClientResponseHandler.class); + private final Map streamidMap; + + public Http2ClientResponseHandler() { + streamidMap = new HashMap(); + } + + public MapValues put(int streamId, ChannelFuture writeFuture, ChannelPromise promise) { + return streamidMap.put(streamId, new MapValues(writeFuture, promise)); + } + + public String awaitResponses(long timeout, TimeUnit unit) { + + Iterator> itr = streamidMap.entrySet() + .iterator(); + + String response = null; + + while (itr.hasNext()) { + Entry entry = itr.next(); + ChannelFuture writeFuture = entry.getValue() + .getWriteFuture(); + + if (!writeFuture.awaitUninterruptibly(timeout, unit)) { + throw new IllegalStateException("Timed out waiting to write for stream id " + entry.getKey()); + } + if (!writeFuture.isSuccess()) { + throw new RuntimeException(writeFuture.cause()); + } + ChannelPromise promise = entry.getValue() + .getPromise(); + + if (!promise.awaitUninterruptibly(timeout, unit)) { + throw new IllegalStateException("Timed out waiting for response on stream id " + entry.getKey()); + } + if (!promise.isSuccess()) { + throw new RuntimeException(promise.cause()); + } + logger.info("---Stream id: " + entry.getKey() + " received---"); + response = entry.getValue().getResponse(); + + itr.remove(); + } + + return response; + + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception { + Integer streamId = msg.headers() + .getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text()); + if (streamId == null) { + logger.error("HttpResponseHandler unexpected message received: " + msg); + return; + } + + MapValues value = streamidMap.get(streamId); + + if (value == null) { + logger.error("Message received for unknown stream id " + streamId); + ctx.close(); + } else { + ByteBuf content = msg.content(); + if (content.isReadable()) { + int contentLength = content.readableBytes(); + byte[] arr = new byte[contentLength]; + content.readBytes(arr); + String response = new String(arr, 0, contentLength, CharsetUtil.UTF_8); + logger.info("Response from Server: "+ (response)); + value.setResponse(response); + } + + value.getPromise() + .setSuccess(); + } + } + + public static class MapValues { + ChannelFuture writeFuture; + ChannelPromise promise; + String response; + + public String getResponse() { + return response; + } + + public void setResponse(String response) { + this.response = response; + } + + public MapValues(ChannelFuture writeFuture2, ChannelPromise promise2) { + this.writeFuture = writeFuture2; + this.promise = promise2; + } + + public ChannelFuture getWriteFuture() { + return writeFuture; + } + + public ChannelPromise getPromise() { + return promise; + } + + } +} diff --git a/netty/src/main/java/com/baeldung/netty/http2/client/Http2SettingsHandler.java b/netty/src/main/java/com/baeldung/netty/http2/client/Http2SettingsHandler.java new file mode 100644 index 0000000000..93841187c7 --- /dev/null +++ b/netty/src/main/java/com/baeldung/netty/http2/client/Http2SettingsHandler.java @@ -0,0 +1,30 @@ +package com.baeldung.netty.http2.client; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http2.Http2Settings; + +import java.util.concurrent.TimeUnit; + +public class Http2SettingsHandler extends SimpleChannelInboundHandler { + private final ChannelPromise promise; + + public Http2SettingsHandler(ChannelPromise promise) { + this.promise = promise; + } + + public void awaitSettings(long timeout, TimeUnit unit) throws Exception { + if (!promise.awaitUninterruptibly(timeout, unit)) { + throw new IllegalStateException("Timed out waiting for settings"); + } + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Http2Settings msg) throws Exception { + promise.setSuccess(); + + ctx.pipeline() + .remove(this); + } +} diff --git a/netty/src/main/java/com/baeldung/netty/http2/server/Http2Server.java b/netty/src/main/java/com/baeldung/netty/http2/server/Http2Server.java new file mode 100644 index 0000000000..a8e9e59953 --- /dev/null +++ b/netty/src/main/java/com/baeldung/netty/http2/server/Http2Server.java @@ -0,0 +1,59 @@ +package com.baeldung.netty.http2.server; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.baeldung.netty.http2.Http2Util; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.ssl.SslContext; + +public final class Http2Server { + + private static final int PORT = 8443; + private static final Logger logger = LoggerFactory.getLogger(Http2Server.class); + + public static void main(String[] args) throws Exception { + SslContext sslCtx = Http2Util.createSSLContext(true); + + EventLoopGroup group = new NioEventLoopGroup(); + try { + ServerBootstrap b = new ServerBootstrap(); + b.option(ChannelOption.SO_BACKLOG, 1024); + b.group(group) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + if (sslCtx != null) { + ch.pipeline() + .addLast(sslCtx.newHandler(ch.alloc()), Http2Util.getServerAPNHandler()); + } + } + + }); + + Channel ch = b.bind(PORT) + .sync() + .channel(); + + logger.info("HTTP/2 Server is listening on https://127.0.0.1:" + PORT + '/'); + + ch.closeFuture() + .sync(); + } finally { + group.shutdownGracefully(); + } + } + +} diff --git a/netty/src/main/java/com/baeldung/netty/http2/server/Http2ServerResponseHandler.java b/netty/src/main/java/com/baeldung/netty/http2/server/Http2ServerResponseHandler.java new file mode 100644 index 0000000000..24c66f15bb --- /dev/null +++ b/netty/src/main/java/com/baeldung/netty/http2/server/Http2ServerResponseHandler.java @@ -0,0 +1,52 @@ +package com.baeldung.netty.http2.server; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.Http2HeadersFrame; +import io.netty.util.CharsetUtil; + +@Sharable +public class Http2ServerResponseHandler extends ChannelDuplexHandler { + + static final ByteBuf RESPONSE_BYTES = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8)); + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + cause.printStackTrace(); + ctx.close(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof Http2HeadersFrame) { + Http2HeadersFrame msgHeader = (Http2HeadersFrame) msg; + if (msgHeader.isEndStream()) { + ByteBuf content = ctx.alloc() + .buffer(); + content.writeBytes(RESPONSE_BYTES.duplicate()); + + Http2Headers headers = new DefaultHttp2Headers().status(HttpResponseStatus.OK.codeAsText()); + ctx.write(new DefaultHttp2HeadersFrame(headers).stream(msgHeader.stream())); + ctx.write(new DefaultHttp2DataFrame(content, true).stream(msgHeader.stream())); + } + + } else { + super.channelRead(ctx, msg); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + +} diff --git a/netty/src/main/resources/logback.xml b/netty/src/main/resources/logback.xml new file mode 100644 index 0000000000..7d900d8ea8 --- /dev/null +++ b/netty/src/main/resources/logback.xml @@ -0,0 +1,13 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + \ No newline at end of file diff --git a/netty/src/test/java/com/baeldung/netty/Http2ClientLiveTest.java b/netty/src/test/java/com/baeldung/netty/Http2ClientLiveTest.java new file mode 100644 index 0000000000..6b9a53a1b3 --- /dev/null +++ b/netty/src/test/java/com/baeldung/netty/Http2ClientLiveTest.java @@ -0,0 +1,91 @@ +package com.baeldung.netty; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.baeldung.netty.http2.Http2Util; +import com.baeldung.netty.http2.client.Http2ClientInitializer; +import com.baeldung.netty.http2.client.Http2ClientResponseHandler; +import com.baeldung.netty.http2.client.Http2SettingsHandler; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.ssl.SslContext; + +//Ensure the server class - Http2Server.java is already started before running this test +public class Http2ClientLiveTest { + + private static final Logger logger = LoggerFactory.getLogger(Http2ClientLiveTest.class); + + private static final String HOST = "127.0.0.1"; + private static final int PORT = 8443; + private SslContext sslCtx; + private Channel channel; + + @Before + public void setup() throws Exception { + sslCtx = Http2Util.createSSLContext(false); + } + + @Test + public void whenRequestSent_thenHelloWorldReceived() throws Exception { + + EventLoopGroup workerGroup = new NioEventLoopGroup(); + Http2ClientInitializer initializer = new Http2ClientInitializer(sslCtx, Integer.MAX_VALUE, HOST, PORT); + + try { + Bootstrap b = new Bootstrap(); + b.group(workerGroup); + b.channel(NioSocketChannel.class); + b.option(ChannelOption.SO_KEEPALIVE, true); + b.remoteAddress(HOST, PORT); + b.handler(initializer); + + channel = b.connect() + .syncUninterruptibly() + .channel(); + + logger.info("Connected to [" + HOST + ':' + PORT + ']'); + + Http2SettingsHandler http2SettingsHandler = initializer.getSettingsHandler(); + http2SettingsHandler.awaitSettings(60, TimeUnit.SECONDS); + + logger.info("Sending request(s)..."); + + FullHttpRequest request = Http2Util.createGetRequest(HOST, PORT); + + Http2ClientResponseHandler responseHandler = initializer.getResponseHandler(); + int streamId = 3; + + responseHandler.put(streamId, channel.write(request), channel.newPromise()); + channel.flush(); + String response = responseHandler.awaitResponses(60, TimeUnit.SECONDS); + + assertEquals("Hello World", response); + + logger.info("Finished HTTP/2 request(s)"); + + } finally { + workerGroup.shutdownGracefully(); + } + + } + + @After + public void cleanup() { + channel.close() + .syncUninterruptibly(); + } +} diff --git a/pom.xml b/pom.xml index e21c13efc2..5602e807b9 100644 --- a/pom.xml +++ b/pom.xml @@ -536,6 +536,7 @@ mybatis netflix-modules + ninja open-liberty @@ -1047,6 +1048,7 @@ mybatis netflix-modules + ninja open-liberty