BAEL-6731: Introduction to JeroMQ (#14412)

This commit is contained in:
Graham Cox 2023-07-16 08:08:38 +01:00 committed by GitHub
parent 296bba8657
commit a4d39d14dd
5 changed files with 511 additions and 0 deletions

57
jeromq/pom.xml Normal file
View File

@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>jeromq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>jeromq</name>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.zeromq</groupId>
<artifactId>jeromq</artifactId>
<version>0.5.3</version>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-engine</artifactId>
<version>${junit-platform.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-console-standalone</artifactId>
<version>${junit-platform.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-migrationsupport</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
<resource>
<directory>src/test/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
</build>
</project>

View File

@ -0,0 +1,256 @@
package com.baeldung.jeromq;
import org.junit.jupiter.api.Test;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class DealerRouterLiveTest {
@Test
public void single() throws Exception {
Thread brokerThread = new Thread(() -> {
try (ZContext context = new ZContext()) {
ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
broker.bind("tcp://*:5555");
String identity = broker.recvStr();
System.out.println(Thread.currentThread().getName() + " - Received identity " + identity);
broker.recv(0); // Envelope delimiter
System.out.println(Thread.currentThread().getName() + " - Received envelope");
String message = broker.recvStr(0); // Response from worker
System.out.println(Thread.currentThread().getName() + " - Received message " + message);
broker.sendMore(identity);
broker.sendMore("xxx");
broker.send("Hello back");
}
});
brokerThread.setName("broker");
Thread workerThread = new Thread(() -> {
try (ZContext context = new ZContext()) {
ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
worker.setIdentity(Thread.currentThread().getName().getBytes(ZMQ.CHARSET));
worker.connect("tcp://localhost:5555");
System.out.println(Thread.currentThread().getName() + " - Connected");
worker.sendMore("");
worker.send("Hello " + Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName() + " - Sent Hello");
worker.recvStr(); // Envelope delimiter
System.out.println(Thread.currentThread().getName() + " - Received Envelope");
String workload = worker.recvStr();
System.out.println(Thread.currentThread().getName() + " - Received " + workload);
}
});
workerThread.setName("worker");
brokerThread.start();
workerThread.start();
workerThread.join();
brokerThread.join();
}
@Test
public void asynchronous() throws Exception {
Thread brokerThread = new Thread(() -> {
try (ZContext context = new ZContext()) {
ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
broker.bind("tcp://*:5555");
while (true) {
String identity = broker.recvStr(ZMQ.DONTWAIT);
System.out.println(Thread.currentThread().getName() + " - Received identity " + identity);
if (identity == null) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {}
} else {
broker.recv(0); // Envelope delimiter
System.out.println(Thread.currentThread().getName() + " - Received envelope");
String message = broker.recvStr(0); // Response from worker
System.out.println(Thread.currentThread().getName() + " - Received message " + message);
broker.sendMore(identity);
broker.sendMore("xxx");
broker.send("Hello back");
break;
}
}
}
});
brokerThread.setName("broker");
Thread workerThread = new Thread(() -> {
try (ZContext context = new ZContext()) {
ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
worker.setIdentity(Thread.currentThread().getName().getBytes(ZMQ.CHARSET));
worker.connect("tcp://localhost:5555");
System.out.println(Thread.currentThread().getName() + " - Connected");
worker.sendMore("");
worker.send("Hello " + Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName() + " - Sent Hello");
worker.recvStr(); // Envelope delimiter
System.out.println(Thread.currentThread().getName() + " - Received Envelope");
String workload = worker.recvStr();
System.out.println(Thread.currentThread().getName() + " - Received " + workload);
}
});
workerThread.setName("worker");
brokerThread.start();
workerThread.start();
workerThread.join();
brokerThread.join();
}
@Test
public void many() throws Exception {
Thread brokerThread = new Thread(() -> {
try (ZContext context = new ZContext()) {
ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
broker.bind("tcp://*:5555");
while (!Thread.currentThread().isInterrupted()) {
String identity = broker.recvStr();
System.out.println(Thread.currentThread().getName() + " - Received identity " + identity);
broker.recv(0); // Envelope delimiter
String message = broker.recvStr(0); // Response from worker
System.out.println(Thread.currentThread().getName() + " - Received message " + message);
broker.sendMore(identity);
broker.sendMore("");
broker.send("Hello back to " + identity);
}
}
});
brokerThread.setName("broker");
Set<Thread> workers = IntStream.range(0, 10)
.mapToObj(index -> {
Thread workerThread = new Thread(() -> {
try (ZContext context = new ZContext()) {
ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
worker.setIdentity(Thread.currentThread().getName().getBytes(ZMQ.CHARSET));
worker.connect("tcp://localhost:5555");
System.out.println(Thread.currentThread().getName() + " - Connected");
worker.sendMore("");
worker.send("Hello " + Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName() + " - Sent Hello");
worker.recvStr(); // Envelope delimiter
String workload = worker.recvStr();
System.out.println(Thread.currentThread().getName() + " - Received " + workload);
}
});
workerThread.setName("worker-" + index);
return workerThread;
})
.collect(Collectors.toSet());
brokerThread.start();
workers.forEach(Thread::start);
for (Thread worker : workers) {
worker.join();
}
brokerThread.interrupt();
}
@Test
public void threaded() throws Exception {
Thread brokerThread = new Thread(() -> {
try (ZContext context = new ZContext()) {
ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
broker.bind("tcp://*:5555");
ExecutorService threadPool = Executors.newFixedThreadPool(5);
Random rng = new Random();
while (!Thread.currentThread().isInterrupted()) {
String identity = broker.recvStr();
System.out.println(Thread.currentThread().getName() + " - Received identity " + identity);
broker.recv(0); // Envelope delimiter
String message = broker.recvStr(0); // Response from worker
System.out.println(Thread.currentThread().getName() + " - Received message " + message);
threadPool.submit(() -> {
try {
Thread.sleep(rng.nextInt(1000) + 1000 );
} catch (Exception e) {}
synchronized(broker) {
broker.sendMore(identity);
broker.sendMore("");
broker.send("Hello back to " + identity + " from " + Thread.currentThread().getName());
}
});
}
threadPool.shutdown();
}
});
brokerThread.setName("broker");
Set<Thread> workers = IntStream.range(0, 10)
.mapToObj(index -> {
Thread workerThread = new Thread(() -> {
try (ZContext context = new ZContext()) {
ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
worker.setIdentity(Thread.currentThread().getName().getBytes(ZMQ.CHARSET));
worker.connect("tcp://localhost:5555");
System.out.println(Thread.currentThread().getName() + " - Connected");
worker.sendMore("");
worker.send("Hello " + Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName() + " - Sent Hello");
worker.recvStr(); // Envelope delimiter
String workload = worker.recvStr();
System.out.println(Thread.currentThread().getName() + " - Received " + workload);
}
});
workerThread.setName("worker-" + index);
return workerThread;
})
.collect(Collectors.toSet());
brokerThread.start();
workers.forEach(Thread::start);
for (Thread worker : workers) {
worker.join();
}
brokerThread.interrupt();
}
}

View File

@ -0,0 +1,101 @@
package com.baeldung.jeromq;
import org.junit.jupiter.api.Test;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class PubSubLiveTest {
@Test
public void singleSub() throws Exception {
Thread server = new Thread(() -> {
try (ZContext context = new ZContext()) {
ZMQ.Socket pub = context.createSocket(SocketType.PUB);
pub.bind("tcp://*:5555");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {}
System.out.println(Thread.currentThread().getName() + " - Sending");
pub.send("Hello");
}
});
server.setName("server");
Thread client = new Thread(() -> {
try (ZContext context = new ZContext()) {
ZMQ.Socket sub = context.createSocket(SocketType.SUB);
sub.connect("tcp://localhost:5555");
System.out.println(Thread.currentThread().getName() + " - Connected");
sub.subscribe("".getBytes());
System.out.println(Thread.currentThread().getName() + " - Subscribed");
String message = sub.recvStr();
System.out.println(Thread.currentThread().getName() + " - " + message);
}
});
client.setName("client");
server.start();
client.start();
client.join();
server.join();
}
@Test
public void manySub() throws Exception {
Thread server = new Thread(() -> {
try (ZContext context = new ZContext()) {
ZMQ.Socket pub = context.createSocket(SocketType.PUB);
pub.bind("tcp://*:5555");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {}
System.out.println(Thread.currentThread().getName() + " - Sending");
pub.send("Hello");
}
});
server.setName("server");
Set<Thread> clients = IntStream.range(0, 10)
.mapToObj(index -> {
Thread client = new Thread(() -> {
try (ZContext context = new ZContext()) {
ZMQ.Socket sub = context.createSocket(SocketType.SUB);
sub.connect("tcp://localhost:5555");
System.out.println(Thread.currentThread().getName() + " - Connected");
sub.subscribe("".getBytes());
System.out.println(Thread.currentThread().getName() + " - Subscribed");
String message = sub.recvStr();
System.out.println(Thread.currentThread().getName() + " - " + message);
}
});
client.setName("client-" + index);
return client;
})
.collect(Collectors.toSet());
server.start();
clients.forEach(Thread::start);
for (Thread client : clients) {
client.join();
}
server.join();
}
}

View File

@ -0,0 +1,96 @@
package com.baeldung.jeromq;
import org.junit.jupiter.api.Test;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class RequestResponseLiveTest {
@Test
public void requestResponse() throws Exception {
try (ZContext context = new ZContext()) {
Thread server = new Thread(() -> {
ZMQ.Socket socket = context.createSocket(SocketType.REP);
socket.bind("inproc://test");
while (!Thread.currentThread().isInterrupted()) {
byte[] reply = socket.recv(0);
System.out.println("Server Received " + ": [" + new String(reply, ZMQ.CHARSET) + "]");
String response = new String(reply, ZMQ.CHARSET) + ", world";
socket.send(response.getBytes(ZMQ.CHARSET), 0);
}
});
Thread client = new Thread(() -> {
ZMQ.Socket socket = context.createSocket(SocketType.REQ);
socket.connect("inproc://test");
for (int requestNbr = 0; requestNbr != 10; requestNbr++) {
String request = "Hello " + requestNbr;
System.out.println("Sending " + request);
socket.send(request.getBytes(ZMQ.CHARSET), 0);
byte[] reply = socket.recv(0);
System.out.println("Client Received " + new String(reply, ZMQ.CHARSET));
}
});
server.start();
client.start();
client.join();
server.interrupt();
}
}
@Test
public void manyRequestResponse() throws Exception {
try (ZContext context = new ZContext()) {
Thread server = new Thread(() -> {
ZMQ.Socket socket = context.createSocket(SocketType.REP);
socket.bind("tcp://*:5555");
while (!Thread.currentThread().isInterrupted()) {
byte[] reply = socket.recv(0);
System.out.println("Server Received " + ": [" + new String(reply, ZMQ.CHARSET) + "]");
String response = new String(reply, ZMQ.CHARSET) + ", world";
socket.send(response.getBytes(ZMQ.CHARSET), 0);
}
});
Set<Thread> clients = IntStream.range(0, 10).mapToObj(index ->
new Thread(() -> {
ZMQ.Socket socket = context.createSocket(SocketType.REQ);
socket.connect("tcp://localhost:5555");
for (int requestNbr = 0; requestNbr != 10; requestNbr++) {
String request = "Hello " + index + " - " + requestNbr;
System.out.println("Sending " + request);
socket.send(request.getBytes(ZMQ.CHARSET), 0);
byte[] reply = socket.recv(0);
System.out.println("Client " + index + " Received " + new String(reply, ZMQ.CHARSET));
}
})
).collect(Collectors.toSet());
server.start();
clients.forEach(Thread::start);
for (Thread client : clients) {
client.join();
}
server.interrupt();
}
}
}

View File

@ -369,6 +369,7 @@
<module>persistence-modules/spring-data-cassandra-reactive</module> <!--JAVA-21844-->
<module>persistence-modules/spring-data-neo4j</module>
<module>java-nashorn</module>
<module>jeromq</module>
</modules>
</profile>