BAEL-7195 Custom Event Handlers and Listeners in Netty (#15634)

* Custom Event Handlers and Listeners in Netty

PR recreated.

* Update libraries-server-2/src/test/java/com/baeldung/netty/customhandlersandlisteners/ChatIntegrationTest.java

Co-authored-by: Luis Javier Peris Morillo <javierperis@gmail.com>

* Update libraries-server-2/src/test/java/com/baeldung/netty/customhandlersandlisteners/ChatIntegrationTest.java

Co-authored-by: Luis Javier Peris Morillo <javierperis@gmail.com>

* Update libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/handler/ServerEventHandler.java

Co-authored-by: Luis Javier Peris Morillo <javierperis@gmail.com>

* Update libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/ChatServerMain.java

Co-authored-by: Luis Javier Peris Morillo <javierperis@gmail.com>

* Update libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/ChatClientMain.java

Co-authored-by: Luis Javier Peris Morillo <javierperis@gmail.com>

* changing to default visibility

* review 2

* Update libraries-server-2/src/main/java/com/baeldung/netty/customhandlersandlisteners/listener/ChannelInfoListener.java

Co-authored-by: Luis Javier Peris Morillo <javierperis@gmail.com>

---------

Co-authored-by: Luis Javier Peris Morillo <javierperis@gmail.com>
This commit is contained in:
Ulisses Lima 2024-03-01 04:32:46 -03:00 committed by GitHub
parent 74c62ec2f6
commit 2024acec91
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 386 additions and 0 deletions

View File

@ -30,6 +30,12 @@
<artifactId>jetty-webapp</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
</dependencies>
<build>
@ -73,6 +79,7 @@
<properties>
<jetty.version>9.4.27.v20200227</jetty.version>
<alpn.version>8.1.11.v20170118</alpn.version>
<netty.version>4.1.104.Final</netty.version>
</properties>
</project>

View File

@ -0,0 +1,77 @@
package com.baeldung.netty.customhandlersandlisteners;
import java.util.Scanner;
import com.baeldung.netty.customhandlersandlisteners.handler.ClientEventHandler;
import com.baeldung.netty.customhandlersandlisteners.listener.ChannelInfoListener;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class ChatClientMain {
private static final String SYSTEM_USER = System.getProperty("user.name");
private static String user;
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
try (Scanner scanner = new Scanner(System.in)) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addFirst(new StringDecoder(), new ClientEventHandler(), new StringEncoder());
}
});
ChannelFuture future = bootstrap.connect(ChatServerMain.HOST, ChatServerMain.PORT)
.sync();
future.addListener(new ChannelInfoListener("connected to server"));
Channel channel = future.sync()
.channel();
messageLoop(scanner, channel);
channel.close();
} catch (Throwable e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
group.shutdownGracefully();
}
}
private static void messageLoop(Scanner scanner, Channel channel) throws InterruptedException {
Thread.sleep(50);
if (user == null) {
System.out.printf("your name [%s]: ", SYSTEM_USER);
user = scanner.nextLine();
if (user.isEmpty())
user = SYSTEM_USER;
}
System.out.print("> ");
while (scanner.hasNext()) {
String message = scanner.nextLine();
if (message.equals("exit"))
break;
ChannelFuture sent = channel.writeAndFlush(user + ";" + message);
sent.addListener(new ChannelInfoListener("message sent"));
sent.addListener(future -> System.out.print("> "));
}
}
}

View File

@ -0,0 +1,53 @@
package com.baeldung.netty.customhandlersandlisteners;
import com.baeldung.netty.customhandlersandlisteners.handler.ServerEventHandler;
import com.baeldung.netty.customhandlersandlisteners.listener.ChannelInfoListener;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public final class ChatServerMain {
public static final String HOST = "localhost";
public static final int PORT = 8081;
public static void main(String[] args) {
EventLoopGroup serverGroup = new NioEventLoopGroup(1);
EventLoopGroup clientGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(serverGroup, clientGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addFirst(new StringDecoder(), new ServerEventHandler(), new StringEncoder());
}
});
ChannelFuture future = bootstrap.bind(HOST, PORT)
.sync();
System.out.println("chat server started. ready to accept clients.");
future.addListener(new ChannelInfoListener("server online"));
future.channel()
.closeFuture()
.sync();
} catch (Throwable e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
serverGroup.shutdownGracefully();
clientGroup.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,12 @@
package com.baeldung.netty.customhandlersandlisteners.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ClientEventHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext context, String msg) {
System.out.println(msg);
}
}

View File

@ -0,0 +1,67 @@
package com.baeldung.netty.customhandlersandlisteners.handler;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import com.baeldung.netty.customhandlersandlisteners.listener.ChannelInfoListener;
import com.baeldung.netty.customhandlersandlisteners.model.Message;
import com.baeldung.netty.customhandlersandlisteners.model.OfflineMessage;
import com.baeldung.netty.customhandlersandlisteners.model.OnlineMessage;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ServerEventHandler extends SimpleChannelInboundHandler<String> {
private static final Map<String, Channel> clients = new HashMap<>();
private static final int MAX_HISTORY = 5;
private static final Queue<String> history = new LinkedList<>();
private void handleBroadcast(Message message, ChannelHandlerContext context) {
final String channelId = id(context.channel());
System.out.printf("[clients: %d] message: %s\n", clients.size(), message);
clients.forEach((id, channel) -> {
if (!id.equals(channelId)) {
ChannelFuture relay = channel.writeAndFlush(message.toString());
relay.addListener(new ChannelInfoListener("message relayed to " + id));
}
});
history.add(message.toString() + "\n");
if (history.size() > MAX_HISTORY)
history.poll();
}
@Override
public void channelRead0(ChannelHandlerContext context, String msg) {
handleBroadcast(Message.parse(msg), context);
}
@Override
public void channelActive(final ChannelHandlerContext context) {
Channel channel = context.channel();
clients.put(id(channel), channel);
history.forEach(channel::writeAndFlush);
handleBroadcast(new OnlineMessage(id(channel)), context);
}
@Override
public void channelInactive(ChannelHandlerContext context) {
Channel channel = context.channel();
clients.remove(id(channel));
handleBroadcast(new OfflineMessage(id(channel)), context);
}
private static String id(Channel channel) {
return channel.id()
.asShortText();
}
}

View File

@ -0,0 +1,31 @@
package com.baeldung.netty.customhandlersandlisteners.listener;
import java.time.Instant;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
public class ChannelInfoListener implements GenericFutureListener<ChannelFuture> {
private final String event;
public ChannelInfoListener(String event) {
this.event = event;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
String status = "OK";
if (!future.isSuccess()) {
status = "FAILED";
future.cause()
.printStackTrace();
}
System.out.printf("%s - channel#%s %s: %s%n", Instant.now(), channel.id()
.asShortText(), status, event);
}
}

View File

@ -0,0 +1,38 @@
package com.baeldung.netty.customhandlersandlisteners.model;
import java.time.Instant;
public class Message {
private final Instant time;
private final String user;
private final String message;
public Message(String user, String message) {
this.time = Instant.now();
this.user = user;
this.message = message;
}
public Instant getTime() {
return time;
}
public String getUser() {
return user;
}
public String getMessage() {
return message;
}
@Override
public String toString() {
return time + " - " + user + ": " + message;
}
public static Message parse(String string) {
String[] arr = string.split(";", 2);
return new Message(arr[0], arr[1]);
}
}

View File

@ -0,0 +1,8 @@
package com.baeldung.netty.customhandlersandlisteners.model;
public class OfflineMessage extends Message {
public OfflineMessage(String info) {
super("system", "client went offline: " + info);
}
}

View File

@ -0,0 +1,8 @@
package com.baeldung.netty.customhandlersandlisteners.model;
public class OnlineMessage extends Message {
public OnlineMessage(String info) {
super("system", "client online: " + info);
}
}

View File

@ -0,0 +1,53 @@
package com.baeldung.netty.customhandlersandlisteners;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.Test;
import com.baeldung.netty.customhandlersandlisteners.handler.ClientEventHandler;
import com.baeldung.netty.customhandlersandlisteners.handler.ServerEventHandler;
import io.netty.channel.embedded.EmbeddedChannel;
class ChatIntegrationTest {
private static final String MSG_1 = "Alice;Anyone there?!";
private static final String MSG_2 = "Bob;Hi, Alice!";
@Test
void whenMessagesWrittenToServer_thenMessagesConsumed() {
EmbeddedChannel server = new EmbeddedChannel(new ServerEventHandler());
assertTrue(server.writeOutbound(MSG_1));
assertTrue(server.writeOutbound(MSG_2));
assertEquals(2, server.outboundMessages()
.size());
assertEquals(MSG_1, server.readOutbound()
.toString());
assertEquals(MSG_2, server.readOutbound()
.toString());
server.close();
}
@Test
void whenClientReceivesMessages_thenMessagesConsumed() {
EmbeddedChannel client = new EmbeddedChannel(new ClientEventHandler());
assertTrue(client.writeOutbound(MSG_1));
assertTrue(client.writeOutbound(MSG_2));
assertEquals(2, client.outboundMessages()
.size());
assertEquals(MSG_1, client.readOutbound()
.toString());
assertEquals(MSG_2, client.readOutbound()
.toString());
client.close();
}
}

View File

@ -0,0 +1,32 @@
package com.baeldung.netty.customhandlersandlisteners;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.time.Instant;
import org.junit.jupiter.api.Test;
import com.baeldung.netty.customhandlersandlisteners.model.Message;
class MessageUnitTest {
@Test
void whenBroadcastMessage_thenParsedSuccessfully() {
String input = "Bob;Hello, world; go!";
Message message = Message.parse(input);
assertEquals("Bob", message.getUser());
assertEquals("Hello, world; go!", message.getMessage());
assertNotNull(message.getTime());
}
@Test
void whenNewMessage_thenExpectedFormat() {
Message message = new Message("Alice", "Testing");
Instant time = message.getTime();
String expected = time + " - Alice: Testing";
assertEquals(expected, message.toString());
}
}