diff --git a/jeromq/pom.xml b/jeromq/pom.xml new file mode 100644 index 0000000000..dfb5086683 --- /dev/null +++ b/jeromq/pom.xml @@ -0,0 +1,57 @@ + + + 4.0.0 + jeromq + 0.0.1-SNAPSHOT + jeromq + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + + org.zeromq + jeromq + 0.5.3 + + + org.junit.platform + junit-platform-engine + ${junit-platform.version} + test + + + org.junit.platform + junit-platform-console-standalone + ${junit-platform.version} + test + + + org.junit.jupiter + junit-jupiter-migrationsupport + ${junit-jupiter.version} + test + + + + + + + src/main/resources + true + + + src/test/resources + true + + + + + + + diff --git a/jeromq/src/test/java/com/baeldung/jeromq/DealerRouterLiveTest.java b/jeromq/src/test/java/com/baeldung/jeromq/DealerRouterLiveTest.java new file mode 100644 index 0000000000..1f2eff1325 --- /dev/null +++ b/jeromq/src/test/java/com/baeldung/jeromq/DealerRouterLiveTest.java @@ -0,0 +1,256 @@ +package com.baeldung.jeromq; + +import org.junit.jupiter.api.Test; +import org.zeromq.SocketType; +import org.zeromq.ZContext; +import org.zeromq.ZMQ; + +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class DealerRouterLiveTest { + @Test + public void single() throws Exception { + Thread brokerThread = new Thread(() -> { + try (ZContext context = new ZContext()) { + + ZMQ.Socket broker = context.createSocket(SocketType.ROUTER); + broker.bind("tcp://*:5555"); + + String identity = broker.recvStr(); + System.out.println(Thread.currentThread().getName() + " - Received identity " + identity); + + broker.recv(0); // Envelope delimiter + System.out.println(Thread.currentThread().getName() + " - Received envelope"); + String message = broker.recvStr(0); // Response from worker + System.out.println(Thread.currentThread().getName() + " - Received message " + message); + + broker.sendMore(identity); + broker.sendMore("xxx"); + broker.send("Hello back"); + } + }); + brokerThread.setName("broker"); + + Thread workerThread = new Thread(() -> { + try (ZContext context = new ZContext()) { + ZMQ.Socket worker = context.createSocket(SocketType.DEALER); + worker.setIdentity(Thread.currentThread().getName().getBytes(ZMQ.CHARSET)); + + worker.connect("tcp://localhost:5555"); + System.out.println(Thread.currentThread().getName() + " - Connected"); + + worker.sendMore(""); + worker.send("Hello " + Thread.currentThread().getName()); + System.out.println(Thread.currentThread().getName() + " - Sent Hello"); + + worker.recvStr(); // Envelope delimiter + System.out.println(Thread.currentThread().getName() + " - Received Envelope"); + String workload = worker.recvStr(); + System.out.println(Thread.currentThread().getName() + " - Received " + workload); + } + }); + workerThread.setName("worker"); + + brokerThread.start(); + workerThread.start(); + + workerThread.join(); + brokerThread.join(); + } + + @Test + public void asynchronous() throws Exception { + Thread brokerThread = new Thread(() -> { + try (ZContext context = new ZContext()) { + + ZMQ.Socket broker = context.createSocket(SocketType.ROUTER); + broker.bind("tcp://*:5555"); + + while (true) { + String identity = broker.recvStr(ZMQ.DONTWAIT); + System.out.println(Thread.currentThread().getName() + " - Received identity " + identity); + + if (identity == null) { + try { + Thread.sleep(100); + } catch (InterruptedException e) {} + } else { + + broker.recv(0); // Envelope delimiter + System.out.println(Thread.currentThread().getName() + " - Received envelope"); + String message = broker.recvStr(0); // Response from worker + System.out.println(Thread.currentThread().getName() + " - Received message " + message); + + broker.sendMore(identity); + broker.sendMore("xxx"); + broker.send("Hello back"); + + break; + } + } + } + }); + brokerThread.setName("broker"); + + Thread workerThread = new Thread(() -> { + try (ZContext context = new ZContext()) { + ZMQ.Socket worker = context.createSocket(SocketType.DEALER); + worker.setIdentity(Thread.currentThread().getName().getBytes(ZMQ.CHARSET)); + + worker.connect("tcp://localhost:5555"); + System.out.println(Thread.currentThread().getName() + " - Connected"); + + worker.sendMore(""); + worker.send("Hello " + Thread.currentThread().getName()); + System.out.println(Thread.currentThread().getName() + " - Sent Hello"); + + worker.recvStr(); // Envelope delimiter + System.out.println(Thread.currentThread().getName() + " - Received Envelope"); + String workload = worker.recvStr(); + System.out.println(Thread.currentThread().getName() + " - Received " + workload); + } + }); + workerThread.setName("worker"); + + brokerThread.start(); + workerThread.start(); + + workerThread.join(); + brokerThread.join(); + } + + + @Test + public void many() throws Exception { + Thread brokerThread = new Thread(() -> { + try (ZContext context = new ZContext()) { + + ZMQ.Socket broker = context.createSocket(SocketType.ROUTER); + broker.bind("tcp://*:5555"); + + while (!Thread.currentThread().isInterrupted()) { + String identity = broker.recvStr(); + System.out.println(Thread.currentThread().getName() + " - Received identity " + identity); + + broker.recv(0); // Envelope delimiter + String message = broker.recvStr(0); // Response from worker + System.out.println(Thread.currentThread().getName() + " - Received message " + message); + + broker.sendMore(identity); + broker.sendMore(""); + broker.send("Hello back to " + identity); + } + } + }); + brokerThread.setName("broker"); + + Set workers = IntStream.range(0, 10) + .mapToObj(index -> { + Thread workerThread = new Thread(() -> { + try (ZContext context = new ZContext()) { + ZMQ.Socket worker = context.createSocket(SocketType.DEALER); + worker.setIdentity(Thread.currentThread().getName().getBytes(ZMQ.CHARSET)); + + worker.connect("tcp://localhost:5555"); + System.out.println(Thread.currentThread().getName() + " - Connected"); + + worker.sendMore(""); + worker.send("Hello " + Thread.currentThread().getName()); + System.out.println(Thread.currentThread().getName() + " - Sent Hello"); + + worker.recvStr(); // Envelope delimiter + String workload = worker.recvStr(); + System.out.println(Thread.currentThread().getName() + " - Received " + workload); + } + }); + workerThread.setName("worker-" + index); + + return workerThread; + }) + .collect(Collectors.toSet()); + + brokerThread.start(); + workers.forEach(Thread::start); + + for (Thread worker : workers) { + worker.join(); + } + brokerThread.interrupt(); + } + + @Test + public void threaded() throws Exception { + Thread brokerThread = new Thread(() -> { + try (ZContext context = new ZContext()) { + + ZMQ.Socket broker = context.createSocket(SocketType.ROUTER); + broker.bind("tcp://*:5555"); + + ExecutorService threadPool = Executors.newFixedThreadPool(5); + Random rng = new Random(); + + while (!Thread.currentThread().isInterrupted()) { + String identity = broker.recvStr(); + System.out.println(Thread.currentThread().getName() + " - Received identity " + identity); + + broker.recv(0); // Envelope delimiter + String message = broker.recvStr(0); // Response from worker + System.out.println(Thread.currentThread().getName() + " - Received message " + message); + + threadPool.submit(() -> { + try { + Thread.sleep(rng.nextInt(1000) + 1000 ); + } catch (Exception e) {} + + synchronized(broker) { + broker.sendMore(identity); + broker.sendMore(""); + broker.send("Hello back to " + identity + " from " + Thread.currentThread().getName()); + } + }); + } + + threadPool.shutdown(); + } + }); + brokerThread.setName("broker"); + + Set workers = IntStream.range(0, 10) + .mapToObj(index -> { + Thread workerThread = new Thread(() -> { + try (ZContext context = new ZContext()) { + ZMQ.Socket worker = context.createSocket(SocketType.DEALER); + worker.setIdentity(Thread.currentThread().getName().getBytes(ZMQ.CHARSET)); + + worker.connect("tcp://localhost:5555"); + System.out.println(Thread.currentThread().getName() + " - Connected"); + + worker.sendMore(""); + worker.send("Hello " + Thread.currentThread().getName()); + System.out.println(Thread.currentThread().getName() + " - Sent Hello"); + + worker.recvStr(); // Envelope delimiter + String workload = worker.recvStr(); + System.out.println(Thread.currentThread().getName() + " - Received " + workload); + } + }); + workerThread.setName("worker-" + index); + + return workerThread; + }) + .collect(Collectors.toSet()); + + brokerThread.start(); + workers.forEach(Thread::start); + + for (Thread worker : workers) { + worker.join(); + } + brokerThread.interrupt(); + } +} diff --git a/jeromq/src/test/java/com/baeldung/jeromq/PubSubLiveTest.java b/jeromq/src/test/java/com/baeldung/jeromq/PubSubLiveTest.java new file mode 100644 index 0000000000..44699646b5 --- /dev/null +++ b/jeromq/src/test/java/com/baeldung/jeromq/PubSubLiveTest.java @@ -0,0 +1,101 @@ +package com.baeldung.jeromq; + +import org.junit.jupiter.api.Test; +import org.zeromq.SocketType; +import org.zeromq.ZContext; +import org.zeromq.ZMQ; + +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class PubSubLiveTest { + @Test + public void singleSub() throws Exception { + Thread server = new Thread(() -> { + try (ZContext context = new ZContext()) { + ZMQ.Socket pub = context.createSocket(SocketType.PUB); + pub.bind("tcp://*:5555"); + + try { + Thread.sleep(3000); + } catch (InterruptedException e) {} + + System.out.println(Thread.currentThread().getName() + " - Sending"); + pub.send("Hello"); + } + }); + server.setName("server"); + + Thread client = new Thread(() -> { + try (ZContext context = new ZContext()) { + ZMQ.Socket sub = context.createSocket(SocketType.SUB); + sub.connect("tcp://localhost:5555"); + System.out.println(Thread.currentThread().getName() + " - Connected"); + + sub.subscribe("".getBytes()); + System.out.println(Thread.currentThread().getName() + " - Subscribed"); + + String message = sub.recvStr(); + System.out.println(Thread.currentThread().getName() + " - " + message); + } + }); + client.setName("client"); + + server.start(); + client.start(); + + client.join(); + server.join(); + } + + + @Test + public void manySub() throws Exception { + Thread server = new Thread(() -> { + try (ZContext context = new ZContext()) { + ZMQ.Socket pub = context.createSocket(SocketType.PUB); + pub.bind("tcp://*:5555"); + + try { + Thread.sleep(3000); + } catch (InterruptedException e) {} + + System.out.println(Thread.currentThread().getName() + " - Sending"); + pub.send("Hello"); + } + }); + server.setName("server"); + + Set clients = IntStream.range(0, 10) + .mapToObj(index -> { + Thread client = new Thread(() -> { + try (ZContext context = new ZContext()) { + ZMQ.Socket sub = context.createSocket(SocketType.SUB); + sub.connect("tcp://localhost:5555"); + System.out.println(Thread.currentThread().getName() + " - Connected"); + + sub.subscribe("".getBytes()); + System.out.println(Thread.currentThread().getName() + " - Subscribed"); + + String message = sub.recvStr(); + System.out.println(Thread.currentThread().getName() + " - " + message); + } + }); + client.setName("client-" + index); + + return client; + }) + .collect(Collectors.toSet()); + + + server.start(); + clients.forEach(Thread::start); + + for (Thread client : clients) { + client.join(); + } + + server.join(); + } +} diff --git a/jeromq/src/test/java/com/baeldung/jeromq/RequestResponseLiveTest.java b/jeromq/src/test/java/com/baeldung/jeromq/RequestResponseLiveTest.java new file mode 100644 index 0000000000..8c0728e446 --- /dev/null +++ b/jeromq/src/test/java/com/baeldung/jeromq/RequestResponseLiveTest.java @@ -0,0 +1,96 @@ +package com.baeldung.jeromq; + +import org.junit.jupiter.api.Test; +import org.zeromq.SocketType; +import org.zeromq.ZContext; +import org.zeromq.ZMQ; + +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class RequestResponseLiveTest { + @Test + public void requestResponse() throws Exception { + try (ZContext context = new ZContext()) { + Thread server = new Thread(() -> { + ZMQ.Socket socket = context.createSocket(SocketType.REP); + socket.bind("inproc://test"); + + while (!Thread.currentThread().isInterrupted()) { + byte[] reply = socket.recv(0); + System.out.println("Server Received " + ": [" + new String(reply, ZMQ.CHARSET) + "]"); + + String response = new String(reply, ZMQ.CHARSET) + ", world"; + socket.send(response.getBytes(ZMQ.CHARSET), 0); + } + }); + + Thread client = new Thread(() -> { + ZMQ.Socket socket = context.createSocket(SocketType.REQ); + socket.connect("inproc://test"); + + for (int requestNbr = 0; requestNbr != 10; requestNbr++) { + String request = "Hello " + requestNbr; + System.out.println("Sending " + request); + socket.send(request.getBytes(ZMQ.CHARSET), 0); + + byte[] reply = socket.recv(0); + System.out.println("Client Received " + new String(reply, ZMQ.CHARSET)); + } + + }); + + server.start(); + client.start(); + + client.join(); + server.interrupt(); + } + } + + @Test + public void manyRequestResponse() throws Exception { + try (ZContext context = new ZContext()) { + Thread server = new Thread(() -> { + ZMQ.Socket socket = context.createSocket(SocketType.REP); + socket.bind("tcp://*:5555"); + + while (!Thread.currentThread().isInterrupted()) { + byte[] reply = socket.recv(0); + System.out.println("Server Received " + ": [" + new String(reply, ZMQ.CHARSET) + "]"); + + String response = new String(reply, ZMQ.CHARSET) + ", world"; + socket.send(response.getBytes(ZMQ.CHARSET), 0); + } + }); + + Set clients = IntStream.range(0, 10).mapToObj(index -> + new Thread(() -> { + ZMQ.Socket socket = context.createSocket(SocketType.REQ); + socket.connect("tcp://localhost:5555"); + + for (int requestNbr = 0; requestNbr != 10; requestNbr++) { + String request = "Hello " + index + " - " + requestNbr; + System.out.println("Sending " + request); + socket.send(request.getBytes(ZMQ.CHARSET), 0); + + byte[] reply = socket.recv(0); + System.out.println("Client " + index + " Received " + new String(reply, ZMQ.CHARSET)); + } + + }) + ).collect(Collectors.toSet()); + + server.start(); + clients.forEach(Thread::start); + + for (Thread client : clients) { + client.join(); + } + + server.interrupt(); + } + + } +} diff --git a/pom.xml b/pom.xml index cfd70b4ffd..f6af9aeba2 100644 --- a/pom.xml +++ b/pom.xml @@ -369,6 +369,7 @@ persistence-modules/spring-data-cassandra-reactive persistence-modules/spring-data-neo4j java-nashorn + jeromq