From 2024acec9127d4394d4e77b709568e9bb444f87d Mon Sep 17 00:00:00 2001 From: Ulisses Lima Date: Fri, 1 Mar 2024 04:32:46 -0300 Subject: [PATCH] BAEL-7195 Custom Event Handlers and Listeners in Netty (#15634) * Custom Event Handlers and Listeners in Netty PR recreated. * Update libraries-server-2/src/test/java/com/baeldung/netty/customhandlersandlisteners/ChatIntegrationTest.java Co-authored-by: Luis Javier Peris Morillo * Update libraries-server-2/src/test/java/com/baeldung/netty/customhandlersandlisteners/ChatIntegrationTest.java Co-authored-by: Luis Javier Peris Morillo * Update libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/handler/ServerEventHandler.java Co-authored-by: Luis Javier Peris Morillo * Update libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/ChatServerMain.java Co-authored-by: Luis Javier Peris Morillo * Update libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/ChatClientMain.java Co-authored-by: Luis Javier Peris Morillo * changing to default visibility * review 2 * Update libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/listener/ChannelInfoListener.java Co-authored-by: Luis Javier Peris Morillo --------- Co-authored-by: Luis Javier Peris Morillo --- libraries-server-2/pom.xml | 7 ++ .../ChatClientMain.java | 77 +++++++++++++++++++ .../ChatServerMain.java | 53 +++++++++++++ .../handler/ClientEventHandler.java | 12 +++ .../handler/ServerEventHandler.java | 67 ++++++++++++++++ .../listener/ChannelInfoListener.java | 31 ++++++++ .../model/Message.java | 38 +++++++++ .../model/OfflineMessage.java | 8 ++ .../model/OnlineMessage.java | 8 ++ .../ChatIntegrationTest.java | 53 +++++++++++++ .../MessageUnitTest.java | 32 ++++++++ 11 files changed, 386 insertions(+) create mode 100644 libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/ChatClientMain.java create mode 100644 libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/ChatServerMain.java create mode 100644 libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/handler/ClientEventHandler.java create mode 100644 libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/handler/ServerEventHandler.java create mode 100644 libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/listener/ChannelInfoListener.java create mode 100644 libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/model/Message.java create mode 100644 libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/model/OfflineMessage.java create mode 100644 libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/model/OnlineMessage.java create mode 100644 libraries-server-2/src/test/java/com/baeldung/netty/customhandlersandlisteners/ChatIntegrationTest.java create mode 100644 libraries-server-2/src/test/java/com/baeldung/netty/customhandlersandlisteners/MessageUnitTest.java diff --git a/libraries-server-2/pom.xml b/libraries-server-2/pom.xml index 7377fa3fa9..d0319d3c10 100644 --- a/libraries-server-2/pom.xml +++ b/libraries-server-2/pom.xml @@ -30,6 +30,12 @@ jetty-webapp ${jetty.version} + + io.netty + netty-all + ${netty.version} + + @@ -73,6 +79,7 @@ 9.4.27.v20200227 8.1.11.v20170118 + 4.1.104.Final \ No newline at end of file diff --git a/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/ChatClientMain.java b/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/ChatClientMain.java new file mode 100644 index 0000000000..7e69cb81e0 --- /dev/null +++ b/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/ChatClientMain.java @@ -0,0 +1,77 @@ +package com.baeldung.netty.customhandlersandlisteners; + +import java.util.Scanner; + +import com.baeldung.netty.customhandlersandlisteners.handler.ClientEventHandler; +import com.baeldung.netty.customhandlersandlisteners.listener.ChannelInfoListener; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; + +public class ChatClientMain { + + private static final String SYSTEM_USER = System.getProperty("user.name"); + private static String user; + + public static void main(String[] args) { + EventLoopGroup group = new NioEventLoopGroup(); + try (Scanner scanner = new Scanner(System.in)) { + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel channel) throws Exception { + channel.pipeline() + .addFirst(new StringDecoder(), new ClientEventHandler(), new StringEncoder()); + } + }); + + ChannelFuture future = bootstrap.connect(ChatServerMain.HOST, ChatServerMain.PORT) + .sync(); + + future.addListener(new ChannelInfoListener("connected to server")); + Channel channel = future.sync() + .channel(); + + messageLoop(scanner, channel); + + channel.close(); + } catch (Throwable e) { + e.printStackTrace(); + Thread.currentThread().interrupt(); + } finally { + group.shutdownGracefully(); + } + } + + private static void messageLoop(Scanner scanner, Channel channel) throws InterruptedException { + Thread.sleep(50); + + if (user == null) { + System.out.printf("your name [%s]: ", SYSTEM_USER); + user = scanner.nextLine(); + if (user.isEmpty()) + user = SYSTEM_USER; + } + + System.out.print("> "); + while (scanner.hasNext()) { + String message = scanner.nextLine(); + if (message.equals("exit")) + break; + + ChannelFuture sent = channel.writeAndFlush(user + ";" + message); + sent.addListener(new ChannelInfoListener("message sent")); + sent.addListener(future -> System.out.print("> ")); + } + } +} \ No newline at end of file diff --git a/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/ChatServerMain.java b/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/ChatServerMain.java new file mode 100644 index 0000000000..e474d1680c --- /dev/null +++ b/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/ChatServerMain.java @@ -0,0 +1,53 @@ +package com.baeldung.netty.customhandlersandlisteners; + +import com.baeldung.netty.customhandlersandlisteners.handler.ServerEventHandler; +import com.baeldung.netty.customhandlersandlisteners.listener.ChannelInfoListener; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; + +public final class ChatServerMain { + + public static final String HOST = "localhost"; + public static final int PORT = 8081; + + public static void main(String[] args) { + EventLoopGroup serverGroup = new NioEventLoopGroup(1); + EventLoopGroup clientGroup = new NioEventLoopGroup(); + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(serverGroup, clientGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel channel) throws Exception { + channel.pipeline() + .addFirst(new StringDecoder(), new ServerEventHandler(), new StringEncoder()); + } + }); + + ChannelFuture future = bootstrap.bind(HOST, PORT) + .sync(); + + System.out.println("chat server started. ready to accept clients."); + future.addListener(new ChannelInfoListener("server online")); + + future.channel() + .closeFuture() + .sync(); + } catch (Throwable e) { + e.printStackTrace(); + Thread.currentThread().interrupt(); + } finally { + serverGroup.shutdownGracefully(); + clientGroup.shutdownGracefully(); + } + } +} \ No newline at end of file diff --git a/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/handler/ClientEventHandler.java b/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/handler/ClientEventHandler.java new file mode 100644 index 0000000000..18ec874a00 --- /dev/null +++ b/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/handler/ClientEventHandler.java @@ -0,0 +1,12 @@ +package com.baeldung.netty.customhandlersandlisteners.handler; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + +public class ClientEventHandler extends SimpleChannelInboundHandler { + + @Override + protected void channelRead0(ChannelHandlerContext context, String msg) { + System.out.println(msg); + } +} diff --git a/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/handler/ServerEventHandler.java b/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/handler/ServerEventHandler.java new file mode 100644 index 0000000000..b557a8b2f3 --- /dev/null +++ b/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/handler/ServerEventHandler.java @@ -0,0 +1,67 @@ +package com.baeldung.netty.customhandlersandlisteners.handler; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; + +import com.baeldung.netty.customhandlersandlisteners.listener.ChannelInfoListener; +import com.baeldung.netty.customhandlersandlisteners.model.Message; +import com.baeldung.netty.customhandlersandlisteners.model.OfflineMessage; +import com.baeldung.netty.customhandlersandlisteners.model.OnlineMessage; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + +public class ServerEventHandler extends SimpleChannelInboundHandler { + + private static final Map clients = new HashMap<>(); + private static final int MAX_HISTORY = 5; + private static final Queue history = new LinkedList<>(); + + private void handleBroadcast(Message message, ChannelHandlerContext context) { + final String channelId = id(context.channel()); + + System.out.printf("[clients: %d] message: %s\n", clients.size(), message); + clients.forEach((id, channel) -> { + if (!id.equals(channelId)) { + ChannelFuture relay = channel.writeAndFlush(message.toString()); + relay.addListener(new ChannelInfoListener("message relayed to " + id)); + } + }); + + history.add(message.toString() + "\n"); + if (history.size() > MAX_HISTORY) + history.poll(); + } + + @Override + public void channelRead0(ChannelHandlerContext context, String msg) { + handleBroadcast(Message.parse(msg), context); + } + + @Override + public void channelActive(final ChannelHandlerContext context) { + Channel channel = context.channel(); + clients.put(id(channel), channel); + + history.forEach(channel::writeAndFlush); + + handleBroadcast(new OnlineMessage(id(channel)), context); + } + + @Override + public void channelInactive(ChannelHandlerContext context) { + Channel channel = context.channel(); + clients.remove(id(channel)); + + handleBroadcast(new OfflineMessage(id(channel)), context); + } + + private static String id(Channel channel) { + return channel.id() + .asShortText(); + } +} \ No newline at end of file diff --git a/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/listener/ChannelInfoListener.java b/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/listener/ChannelInfoListener.java new file mode 100644 index 0000000000..a6954b3200 --- /dev/null +++ b/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/listener/ChannelInfoListener.java @@ -0,0 +1,31 @@ +package com.baeldung.netty.customhandlersandlisteners.listener; + +import java.time.Instant; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.GenericFutureListener; + +public class ChannelInfoListener implements GenericFutureListener { + + private final String event; + + public ChannelInfoListener(String event) { + this.event = event; + } + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + Channel channel = future.channel(); + String status = "OK"; + + if (!future.isSuccess()) { + status = "FAILED"; + future.cause() + .printStackTrace(); + } + + System.out.printf("%s - channel#%s %s: %s%n", Instant.now(), channel.id() + .asShortText(), status, event); + } +} diff --git a/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/model/Message.java b/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/model/Message.java new file mode 100644 index 0000000000..746752a306 --- /dev/null +++ b/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/model/Message.java @@ -0,0 +1,38 @@ +package com.baeldung.netty.customhandlersandlisteners.model; + +import java.time.Instant; + +public class Message { + + private final Instant time; + private final String user; + private final String message; + + public Message(String user, String message) { + this.time = Instant.now(); + this.user = user; + this.message = message; + } + + public Instant getTime() { + return time; + } + + public String getUser() { + return user; + } + + public String getMessage() { + return message; + } + + @Override + public String toString() { + return time + " - " + user + ": " + message; + } + + public static Message parse(String string) { + String[] arr = string.split(";", 2); + return new Message(arr[0], arr[1]); + } +} diff --git a/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/model/OfflineMessage.java b/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/model/OfflineMessage.java new file mode 100644 index 0000000000..4a83b9335d --- /dev/null +++ b/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/model/OfflineMessage.java @@ -0,0 +1,8 @@ +package com.baeldung.netty.customhandlersandlisteners.model; + +public class OfflineMessage extends Message { + + public OfflineMessage(String info) { + super("system", "client went offline: " + info); + } +} diff --git a/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/model/OnlineMessage.java b/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/model/OnlineMessage.java new file mode 100644 index 0000000000..8c8eb2ffde --- /dev/null +++ b/libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/model/OnlineMessage.java @@ -0,0 +1,8 @@ +package com.baeldung.netty.customhandlersandlisteners.model; + +public class OnlineMessage extends Message { + + public OnlineMessage(String info) { + super("system", "client online: " + info); + } +} diff --git a/libraries-server-2/src/test/java/com/baeldung/netty/customhandlersandlisteners/ChatIntegrationTest.java b/libraries-server-2/src/test/java/com/baeldung/netty/customhandlersandlisteners/ChatIntegrationTest.java new file mode 100644 index 0000000000..ac12de8abe --- /dev/null +++ b/libraries-server-2/src/test/java/com/baeldung/netty/customhandlersandlisteners/ChatIntegrationTest.java @@ -0,0 +1,53 @@ +package com.baeldung.netty.customhandlersandlisteners; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +import com.baeldung.netty.customhandlersandlisteners.handler.ClientEventHandler; +import com.baeldung.netty.customhandlersandlisteners.handler.ServerEventHandler; + +import io.netty.channel.embedded.EmbeddedChannel; + +class ChatIntegrationTest { + + private static final String MSG_1 = "Alice;Anyone there?!"; + private static final String MSG_2 = "Bob;Hi, Alice!"; + + @Test + void whenMessagesWrittenToServer_thenMessagesConsumed() { + EmbeddedChannel server = new EmbeddedChannel(new ServerEventHandler()); + + assertTrue(server.writeOutbound(MSG_1)); + assertTrue(server.writeOutbound(MSG_2)); + + assertEquals(2, server.outboundMessages() + .size()); + + assertEquals(MSG_1, server.readOutbound() + .toString()); + assertEquals(MSG_2, server.readOutbound() + .toString()); + + server.close(); + } + + @Test + void whenClientReceivesMessages_thenMessagesConsumed() { + EmbeddedChannel client = new EmbeddedChannel(new ClientEventHandler()); + + assertTrue(client.writeOutbound(MSG_1)); + assertTrue(client.writeOutbound(MSG_2)); + + assertEquals(2, client.outboundMessages() + .size()); + + assertEquals(MSG_1, client.readOutbound() + .toString()); + assertEquals(MSG_2, client.readOutbound() + .toString()); + + client.close(); + } +} diff --git a/libraries-server-2/src/test/java/com/baeldung/netty/customhandlersandlisteners/MessageUnitTest.java b/libraries-server-2/src/test/java/com/baeldung/netty/customhandlersandlisteners/MessageUnitTest.java new file mode 100644 index 0000000000..429aed7813 --- /dev/null +++ b/libraries-server-2/src/test/java/com/baeldung/netty/customhandlersandlisteners/MessageUnitTest.java @@ -0,0 +1,32 @@ +package com.baeldung.netty.customhandlersandlisteners; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.time.Instant; + +import org.junit.jupiter.api.Test; + +import com.baeldung.netty.customhandlersandlisteners.model.Message; + +class MessageUnitTest { + + @Test + void whenBroadcastMessage_thenParsedSuccessfully() { + String input = "Bob;Hello, world; go!"; + Message message = Message.parse(input); + + assertEquals("Bob", message.getUser()); + assertEquals("Hello, world; go!", message.getMessage()); + assertNotNull(message.getTime()); + } + + @Test + void whenNewMessage_thenExpectedFormat() { + Message message = new Message("Alice", "Testing"); + Instant time = message.getTime(); + + String expected = time + " - Alice: Testing"; + assertEquals(expected, message.toString()); + } +}