From 4732baccab9237f07f95f2551069b416ba2fa406 Mon Sep 17 00:00:00 2001 From: Anton Date: Wed, 7 Jun 2017 08:52:23 +0300 Subject: [PATCH] BAEL-768 Introduction to Netty (#1933) * BAEL-748 quick guide to @Value * BAEL-748 changes from review * BAEL-748 inject comma-separated values into array * BAEL-768 Introduction to Netty * BAEL-768 remove commented code --- libraries/pom.xml | 6 +++ .../com/baeldung/netty/ClientHandler.java | 20 ++++++++ .../java/com/baeldung/netty/NettyClient.java | 37 ++++++++++++++ .../java/com/baeldung/netty/NettyServer.java | 49 +++++++++++++++++++ .../com/baeldung/netty/ProcessingHandler.java | 19 +++++++ .../java/com/baeldung/netty/RequestData.java | 30 ++++++++++++ .../baeldung/netty/RequestDataEncoder.java | 19 +++++++ .../com/baeldung/netty/RequestDecoder.java | 22 +++++++++ .../java/com/baeldung/netty/ResponseData.java | 20 ++++++++ .../baeldung/netty/ResponseDataDecoder.java | 17 +++++++ .../baeldung/netty/ResponseDataEncoder.java | 13 +++++ .../netty/SimpleProcessingHandler.java | 39 +++++++++++++++ 12 files changed, 291 insertions(+) create mode 100644 libraries/src/main/java/com/baeldung/netty/ClientHandler.java create mode 100644 libraries/src/main/java/com/baeldung/netty/NettyClient.java create mode 100644 libraries/src/main/java/com/baeldung/netty/NettyServer.java create mode 100644 libraries/src/main/java/com/baeldung/netty/ProcessingHandler.java create mode 100644 libraries/src/main/java/com/baeldung/netty/RequestData.java create mode 100644 libraries/src/main/java/com/baeldung/netty/RequestDataEncoder.java create mode 100644 libraries/src/main/java/com/baeldung/netty/RequestDecoder.java create mode 100644 libraries/src/main/java/com/baeldung/netty/ResponseData.java create mode 100644 libraries/src/main/java/com/baeldung/netty/ResponseDataDecoder.java create mode 100644 libraries/src/main/java/com/baeldung/netty/ResponseDataEncoder.java create mode 100644 libraries/src/main/java/com/baeldung/netty/SimpleProcessingHandler.java diff --git a/libraries/pom.xml b/libraries/pom.xml index a8a30b855e..f03ba93d27 100644 --- a/libraries/pom.xml +++ b/libraries/pom.xml @@ -308,6 +308,11 @@ jool 0.9.12 + + io.netty + netty-all + ${netty.version} + 0.7.0 @@ -332,6 +337,7 @@ 1.1.3-rc.5 1.4.0 1.1.0 + 4.1.10.Final diff --git a/libraries/src/main/java/com/baeldung/netty/ClientHandler.java b/libraries/src/main/java/com/baeldung/netty/ClientHandler.java new file mode 100644 index 0000000000..5981affd22 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/netty/ClientHandler.java @@ -0,0 +1,20 @@ +package com.baeldung.netty; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +public class ClientHandler extends ChannelInboundHandlerAdapter { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + RequestData msg = new RequestData(); + msg.setIntValue(123); + msg.setStringValue("all work and no play makes jack a dull boy"); + ctx.writeAndFlush(msg); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + System.out.println(msg); + ctx.close(); + } +} diff --git a/libraries/src/main/java/com/baeldung/netty/NettyClient.java b/libraries/src/main/java/com/baeldung/netty/NettyClient.java new file mode 100644 index 0000000000..97dfc70b9f --- /dev/null +++ b/libraries/src/main/java/com/baeldung/netty/NettyClient.java @@ -0,0 +1,37 @@ +package com.baeldung.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; + +public class NettyClient { + public static void main(String[] args) throws Exception { + String host = "localhost"; + int port = 8080; + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + Bootstrap b = new Bootstrap(); + b.group(workerGroup); + b.channel(NioSocketChannel.class); + b.option(ChannelOption.SO_KEEPALIVE, true); + b.handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(), new ClientHandler()); + } + }); + + ChannelFuture f = b.connect(host, port).sync(); + + f.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + } + } +} diff --git a/libraries/src/main/java/com/baeldung/netty/NettyServer.java b/libraries/src/main/java/com/baeldung/netty/NettyServer.java new file mode 100644 index 0000000000..b9d35859d0 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/netty/NettyServer.java @@ -0,0 +1,49 @@ +package com.baeldung.netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; + +public class NettyServer { + + private int port; + + public NettyServer(int port) { + this.port = port; + } + + public static void main(String[] args) throws Exception { + int port; + if (args.length > 0) { + port = Integer.parseInt(args[0]); + } else { + port = 8080; + } + new NettyServer(port).run(); + } + + public void run() throws Exception { + EventLoopGroup bossGroup = new NioEventLoopGroup(); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new RequestDecoder(), new ResponseDataEncoder(), new ProcessingHandler()); + } + }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true); + + ChannelFuture f = b.bind(port).sync(); + f.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } +} diff --git a/libraries/src/main/java/com/baeldung/netty/ProcessingHandler.java b/libraries/src/main/java/com/baeldung/netty/ProcessingHandler.java new file mode 100644 index 0000000000..6de4d3fca8 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/netty/ProcessingHandler.java @@ -0,0 +1,19 @@ +package com.baeldung.netty; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +public class ProcessingHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + RequestData requestData = (RequestData) msg; + ResponseData responseData = new ResponseData(); + responseData.setIntValue(requestData.getIntValue() * 2); + ChannelFuture future = ctx.writeAndFlush(responseData); + future.addListener(ChannelFutureListener.CLOSE); + System.out.println(requestData); + } +} diff --git a/libraries/src/main/java/com/baeldung/netty/RequestData.java b/libraries/src/main/java/com/baeldung/netty/RequestData.java new file mode 100644 index 0000000000..e7404c1663 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/netty/RequestData.java @@ -0,0 +1,30 @@ +package com.baeldung.netty; + +public class RequestData { + private int intValue; + private String stringValue; + + public int getIntValue() { + return intValue; + } + + public void setIntValue(int intValue) { + this.intValue = intValue; + } + + public String getStringValue() { + return stringValue; + } + + public void setStringValue(String stringValue) { + this.stringValue = stringValue; + } + + @Override + public String toString() { + return "RequestData{" + + "intValue=" + intValue + + ", stringValue='" + stringValue + '\'' + + '}'; + } +} diff --git a/libraries/src/main/java/com/baeldung/netty/RequestDataEncoder.java b/libraries/src/main/java/com/baeldung/netty/RequestDataEncoder.java new file mode 100644 index 0000000000..205a48699e --- /dev/null +++ b/libraries/src/main/java/com/baeldung/netty/RequestDataEncoder.java @@ -0,0 +1,19 @@ +package com.baeldung.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +import java.nio.charset.Charset; + +public class RequestDataEncoder extends MessageToByteEncoder { + + private final Charset charset = Charset.forName("UTF-8"); + + @Override + protected void encode(ChannelHandlerContext ctx, RequestData msg, ByteBuf out) throws Exception { + out.writeInt(msg.getIntValue()); + out.writeInt(msg.getStringValue().length()); + out.writeCharSequence(msg.getStringValue(), charset); + } +} diff --git a/libraries/src/main/java/com/baeldung/netty/RequestDecoder.java b/libraries/src/main/java/com/baeldung/netty/RequestDecoder.java new file mode 100644 index 0000000000..38b78fc877 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/netty/RequestDecoder.java @@ -0,0 +1,22 @@ +package com.baeldung.netty; + +import java.nio.charset.Charset; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ReplayingDecoder; + +public class RequestDecoder extends ReplayingDecoder { + + private final Charset charset = Charset.forName("UTF-8"); + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + RequestData data = new RequestData(); + data.setIntValue(in.readInt()); + int strLen = in.readInt(); + data.setStringValue(in.readCharSequence(strLen, charset).toString()); + out.add(data); + } +} diff --git a/libraries/src/main/java/com/baeldung/netty/ResponseData.java b/libraries/src/main/java/com/baeldung/netty/ResponseData.java new file mode 100644 index 0000000000..ce388a9a3d --- /dev/null +++ b/libraries/src/main/java/com/baeldung/netty/ResponseData.java @@ -0,0 +1,20 @@ +package com.baeldung.netty; + +public class ResponseData { + private int intValue; + + public int getIntValue() { + return intValue; + } + + public void setIntValue(int intValue) { + this.intValue = intValue; + } + + @Override + public String toString() { + return "ResponseData{" + + "intValue=" + intValue + + '}'; + } +} diff --git a/libraries/src/main/java/com/baeldung/netty/ResponseDataDecoder.java b/libraries/src/main/java/com/baeldung/netty/ResponseDataDecoder.java new file mode 100644 index 0000000000..ee33679dfe --- /dev/null +++ b/libraries/src/main/java/com/baeldung/netty/ResponseDataDecoder.java @@ -0,0 +1,17 @@ +package com.baeldung.netty; + +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ReplayingDecoder; + +public class ResponseDataDecoder extends ReplayingDecoder { + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + ResponseData data = new ResponseData(); + data.setIntValue(in.readInt()); + out.add(data); + } +} \ No newline at end of file diff --git a/libraries/src/main/java/com/baeldung/netty/ResponseDataEncoder.java b/libraries/src/main/java/com/baeldung/netty/ResponseDataEncoder.java new file mode 100644 index 0000000000..c73be11a44 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/netty/ResponseDataEncoder.java @@ -0,0 +1,13 @@ +package com.baeldung.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +public class ResponseDataEncoder extends MessageToByteEncoder { + + @Override + protected void encode(ChannelHandlerContext ctx, ResponseData msg, ByteBuf out) throws Exception { + out.writeInt(msg.getIntValue()); + } +} diff --git a/libraries/src/main/java/com/baeldung/netty/SimpleProcessingHandler.java b/libraries/src/main/java/com/baeldung/netty/SimpleProcessingHandler.java new file mode 100644 index 0000000000..d12089f0bb --- /dev/null +++ b/libraries/src/main/java/com/baeldung/netty/SimpleProcessingHandler.java @@ -0,0 +1,39 @@ +package com.baeldung.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +public class SimpleProcessingHandler extends ChannelInboundHandlerAdapter { + private ByteBuf tmp; + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + System.out.println("Handler added"); + tmp = ctx.alloc().buffer(4); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + System.out.println("Handler removed"); + tmp.release(); + tmp = null; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ByteBuf m = (ByteBuf) msg; + tmp.writeBytes(m); + m.release(); + if (tmp.readableBytes() >= 4) { + RequestData requestData = new RequestData(); + requestData.setIntValue(tmp.readInt()); + ResponseData responseData = new ResponseData(); + responseData.setIntValue(requestData.getIntValue() * 2); + ChannelFuture future = ctx.writeAndFlush(responseData); + future.addListener(ChannelFutureListener.CLOSE); + } + } +}