BAEL-5698 (#12858)
* [BAEL-4849] Article code * [BAEL-4968] Article code * [BAEL-4968] Article code * [BAEL-4968] Article code * [BAEL-4968] Remove extra comments * [BAEL-5258] Article Code * [BAEL-2765] PKCE Support for Secret Clients * [BAEL-5698] Article code * [BAEL-5698] Article code
This commit is contained in:
parent
dede91f49d
commit
fe09cfb802
|
@ -7,3 +7,4 @@ This module contains articles about RabbitMQ.
|
||||||
- [Exchanges, Queues, and Bindings in RabbitMQ](https://www.baeldung.com/java-rabbitmq-exchanges-queues-bindings)
|
- [Exchanges, Queues, and Bindings in RabbitMQ](https://www.baeldung.com/java-rabbitmq-exchanges-queues-bindings)
|
||||||
- [Pub-Sub vs. Message Queues](https://www.baeldung.com/pub-sub-vs-message-queues)
|
- [Pub-Sub vs. Message Queues](https://www.baeldung.com/pub-sub-vs-message-queues)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
version: '3.0'
|
||||||
|
services:
|
||||||
|
rabbitmq:
|
||||||
|
image: rabbitmq:3-management
|
||||||
|
environment:
|
||||||
|
- RABBITMQ_DEFAULT_USER=guest
|
||||||
|
- RABBITMQ_DEFAULT_PASS=guest
|
||||||
|
- RABBITMQ_VM_MEMORY_HIGH_WATERMARK_RELATIVE=0.8
|
||||||
|
ports:
|
||||||
|
- "5672:5672"
|
||||||
|
- "15672:15672"
|
||||||
|
volumes:
|
||||||
|
- ./src/rabbitmq/20-mem.conf:/etc/rabbitmq/conf.d/20-mem.conf
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
<version>0.0.1-SNAPSHOT</version>
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
<relativePath>../parent-boot-2</relativePath>
|
<relativePath>../parent-boot-2</relativePath>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
|
|
||||||
<dependencyManagement>
|
<dependencyManagement>
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
@ -30,7 +31,6 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.rabbitmq</groupId>
|
<groupId>com.rabbitmq</groupId>
|
||||||
<artifactId>amqp-client</artifactId>
|
<artifactId>amqp-client</artifactId>
|
||||||
<version>${amqp-client.version}</version>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
@ -40,10 +40,10 @@
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>spring-boot-starter-amqp</artifactId>
|
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<amqp-client.version>5.12.0</amqp-client.version>
|
|
||||||
<spring-cloud-dependencies.version>2020.0.3</spring-cloud-dependencies.version>
|
<spring-cloud-dependencies.version>2020.0.3</spring-cloud-dependencies.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,97 @@
|
||||||
|
package com.baeldung.benchmark;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.LongSummaryStatistics;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.baeldung.benchmark.Worker.WorkerResult;
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import com.rabbitmq.client.Connection;
|
||||||
|
import com.rabbitmq.client.ConnectionFactory;
|
||||||
|
|
||||||
|
public class ConnectionPerChannelPublisher implements Callable<Long> {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(ConnectionPerChannelPublisher.class);
|
||||||
|
private final ConnectionFactory factory;
|
||||||
|
private final int workerCount;
|
||||||
|
private final int iterations;
|
||||||
|
private final int payloadSize;
|
||||||
|
|
||||||
|
ConnectionPerChannelPublisher(ConnectionFactory factory, int workerCount, int iterations, int payloadSize) {
|
||||||
|
this.factory = factory;
|
||||||
|
this.workerCount = workerCount;
|
||||||
|
this.iterations = iterations;
|
||||||
|
this.payloadSize = payloadSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
|
||||||
|
if (args.length != 4) {
|
||||||
|
System.err.println("Usage: java " + ConnectionPerChannelPublisher.class.getName() + " <host> <#channels> <#messages> <payloadSize>");
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
ConnectionFactory factory = new ConnectionFactory();
|
||||||
|
factory.setHost(args[0]);
|
||||||
|
|
||||||
|
int workerCount = Integer.parseInt(args[1]);
|
||||||
|
int iterations = Integer.parseInt(args[2]);
|
||||||
|
int payloadSize = Integer.parseInt(args[3]);
|
||||||
|
|
||||||
|
// run the benchmark 10x and get the average throughput
|
||||||
|
LongSummaryStatistics summary = IntStream.range(0, 9)
|
||||||
|
.mapToObj(idx -> new ConnectionPerChannelPublisher(factory, workerCount, iterations, payloadSize))
|
||||||
|
.map(p -> p.call())
|
||||||
|
.collect(Collectors.summarizingLong((l) -> l));
|
||||||
|
|
||||||
|
log.info("[I66] workers={}, throughput={}", workerCount, (int)Math.floor(summary.getAverage()));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long call() {
|
||||||
|
try {
|
||||||
|
List<Worker> workers = new ArrayList<>();
|
||||||
|
CountDownLatch counter = new CountDownLatch(workerCount);
|
||||||
|
|
||||||
|
for (int i = 0; i < workerCount; i++) {
|
||||||
|
Connection conn = factory.newConnection();
|
||||||
|
workers.add(new Worker("queue_" + i, conn, iterations, counter, payloadSize));
|
||||||
|
}
|
||||||
|
|
||||||
|
ExecutorService executor = new ThreadPoolExecutor(workerCount, workerCount, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(workerCount, true));
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
log.info("[I61] Starting {} workers...", workers.size());
|
||||||
|
executor.invokeAll(workers);
|
||||||
|
if (counter.await(5, TimeUnit.MINUTES)) {
|
||||||
|
long elapsed = System.currentTimeMillis() - start;
|
||||||
|
log.info("[I59] Tasks completed: #workers={}, #iterations={}, elapsed={}ms, stats={}", workerCount, iterations, elapsed);
|
||||||
|
return throughput(workerCount, iterations, elapsed);
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException("[E61] Timeout waiting workers to complete");
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long throughput(int workerCount, int iterations, long elapsed) {
|
||||||
|
return (iterations * workerCount * 1000) / elapsed;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,165 @@
|
||||||
|
package com.baeldung.benchmark;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.LongSummaryStatistics;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import com.rabbitmq.client.Connection;
|
||||||
|
import com.rabbitmq.client.ConnectionFactory;
|
||||||
|
|
||||||
|
public class SharedConnectionPublisher {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SharedConnectionPublisher.class);
|
||||||
|
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
if ( args.length != 6) {
|
||||||
|
System.err.println("Usage: java " + SharedConnectionPublisher.class.getName() + " <host> <#channels> <#messages> <payloadSize> <#channels/connection> <extra work time(ms)>");
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
ConnectionFactory factory = new ConnectionFactory();
|
||||||
|
factory.setHost(args[0]);
|
||||||
|
|
||||||
|
List<Worker> workers = new ArrayList<>();
|
||||||
|
|
||||||
|
int workerCount = Integer.parseInt(args[1]);
|
||||||
|
int iterations = Integer.parseInt(args[2]);
|
||||||
|
int payloadSize = Integer.parseInt(args[3]);
|
||||||
|
int channelsPerConnection = Integer.parseInt(args[4]);
|
||||||
|
long extraWork = Long.parseLong(args[5]);
|
||||||
|
|
||||||
|
log.info("[I35] Creating {} worker{}...", workerCount, (workerCount > 1)?"s":"");
|
||||||
|
|
||||||
|
CountDownLatch counter = new CountDownLatch(workerCount);
|
||||||
|
|
||||||
|
int connCount = (workerCount + channelsPerConnection-1)/channelsPerConnection;
|
||||||
|
List<Connection> connections = new ArrayList<>(connCount);
|
||||||
|
for( int i =0 ; i< connCount; i++) {
|
||||||
|
log.info("[I59] Creating connection#{}", i);
|
||||||
|
connections.add(factory.newConnection());
|
||||||
|
}
|
||||||
|
|
||||||
|
for( int i = 0 ; i < workerCount ; i++ ) {
|
||||||
|
workers.add(new Worker("queue_" + i, connections.get(i % connCount), iterations, counter,payloadSize,extraWork));
|
||||||
|
}
|
||||||
|
|
||||||
|
ExecutorService executor = new ThreadPoolExecutor(workerCount, workerCount, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(workerCount, true));
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
log.info("[I61] Starting workers...");
|
||||||
|
List<Future<WorkerResult>> results = executor.invokeAll(workers);
|
||||||
|
|
||||||
|
log.info("[I55] Waiting workers to complete...");
|
||||||
|
if( counter.await(5, TimeUnit.MINUTES) ) {
|
||||||
|
long elapsed = System.currentTimeMillis() - start - (workerCount*iterations*extraWork);
|
||||||
|
log.info("[I59] Tasks completed: #workers={}, #iterations={}, elapsed={}ms",
|
||||||
|
workerCount,
|
||||||
|
iterations,
|
||||||
|
elapsed);
|
||||||
|
|
||||||
|
LongSummaryStatistics summary = results.stream()
|
||||||
|
.map(f -> safeGet(f))
|
||||||
|
.map(r -> r.elapsed)
|
||||||
|
.collect(Collectors.summarizingLong((l) -> l));
|
||||||
|
|
||||||
|
log.info("[I74] stats={}", summary);
|
||||||
|
log.info("[I79] result: workers={}, throughput={}",workerCount,throughput(workerCount,iterations,elapsed));
|
||||||
|
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
log.error("[E61] Timeout waiting workers to complete");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
catch(Exception ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long throughput(int workerCount, int iterations, long elapsed) {
|
||||||
|
return (iterations*workerCount*1000)/elapsed;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static <T> T safeGet(Future<T> f) {
|
||||||
|
try {
|
||||||
|
return f.get();
|
||||||
|
}
|
||||||
|
catch(Exception ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class WorkerResult {
|
||||||
|
public final long elapsed;
|
||||||
|
WorkerResult(long elapsed) {
|
||||||
|
this.elapsed = elapsed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static class Worker implements Callable<WorkerResult> {
|
||||||
|
|
||||||
|
private final Connection conn;
|
||||||
|
private final Channel channel;
|
||||||
|
private int iterations;
|
||||||
|
private final CountDownLatch counter;
|
||||||
|
private final String queue;
|
||||||
|
private final byte[] payload;
|
||||||
|
private long extraWork;
|
||||||
|
|
||||||
|
Worker(String queue, Connection conn, int iterations, CountDownLatch counter,int payloadSize,long extraWork) throws IOException {
|
||||||
|
this.conn = conn;
|
||||||
|
this.iterations = iterations;
|
||||||
|
this.counter = counter;
|
||||||
|
this.queue = queue;
|
||||||
|
this.extraWork = extraWork;
|
||||||
|
|
||||||
|
channel = conn.createChannel();
|
||||||
|
channel.queueDeclare(queue, false, false, true, null);
|
||||||
|
|
||||||
|
this.payload = new byte[payloadSize];
|
||||||
|
new Random().nextBytes(payload);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WorkerResult call() throws Exception {
|
||||||
|
|
||||||
|
try {
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
for ( int i = 0 ; i < iterations ; i++ ) {
|
||||||
|
channel.basicPublish("", queue, null,payload);
|
||||||
|
Thread.sleep(extraWork);
|
||||||
|
}
|
||||||
|
|
||||||
|
long elapsed = System.currentTimeMillis() - start - (extraWork*iterations);
|
||||||
|
channel.queueDelete(queue);
|
||||||
|
return new WorkerResult(elapsed);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
counter.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,116 @@
|
||||||
|
package com.baeldung.benchmark;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.LongSummaryStatistics;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.baeldung.benchmark.Worker.WorkerResult;
|
||||||
|
import com.rabbitmq.client.Connection;
|
||||||
|
import com.rabbitmq.client.ConnectionFactory;
|
||||||
|
|
||||||
|
public class SingleConnectionPublisher implements Callable<Long> {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SingleConnectionPublisher.class);
|
||||||
|
|
||||||
|
private final ConnectionFactory factory;
|
||||||
|
private final int workerCount;
|
||||||
|
private final int iterations;
|
||||||
|
private final int payloadSize;
|
||||||
|
|
||||||
|
SingleConnectionPublisher(ConnectionFactory factory, int workerCount, int iterations, int payloadSize) {
|
||||||
|
this.factory = factory;
|
||||||
|
this.workerCount = workerCount;
|
||||||
|
this.iterations = iterations;
|
||||||
|
this.payloadSize = payloadSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
|
||||||
|
if ( args.length != 4) {
|
||||||
|
System.err.println("Usage: java " + SingleConnectionPublisher.class.getName() + " <host> <#channels> <#messages> <payloadSize>");
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
ConnectionFactory factory = new ConnectionFactory();
|
||||||
|
factory.setHost(args[0]);
|
||||||
|
|
||||||
|
int workerCount = Integer.parseInt(args[1]);
|
||||||
|
int iterations = Integer.parseInt(args[2]);
|
||||||
|
int payloadSize = Integer.parseInt(args[3]);
|
||||||
|
|
||||||
|
LongSummaryStatistics summary = IntStream.range(0, 9)
|
||||||
|
.mapToObj(idx -> new SingleConnectionPublisher(factory, workerCount, iterations, payloadSize))
|
||||||
|
.map(p -> p.call())
|
||||||
|
.collect(Collectors.summarizingLong((l) -> l));
|
||||||
|
|
||||||
|
log.info("[I66] workers={}, throughput={}", workerCount, (int)Math.floor(summary.getAverage()));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long call() {
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
Connection connection = factory.newConnection();
|
||||||
|
CountDownLatch counter = new CountDownLatch(workerCount);
|
||||||
|
List<Worker> workers = new ArrayList<>();
|
||||||
|
|
||||||
|
for( int i = 0 ; i < workerCount ; i++ ) {
|
||||||
|
workers.add(new Worker("queue_" + i, connection, iterations, counter,payloadSize));
|
||||||
|
}
|
||||||
|
|
||||||
|
ExecutorService executor = new ThreadPoolExecutor(workerCount, workerCount, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(workerCount, true));
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
log.info("[I61] Starting {} workers...", workers.size());
|
||||||
|
List<Future<WorkerResult>> results = executor.invokeAll(workers);
|
||||||
|
|
||||||
|
if( counter.await(5, TimeUnit.MINUTES) ) {
|
||||||
|
long elapsed = System.currentTimeMillis() - start;
|
||||||
|
|
||||||
|
LongSummaryStatistics summary = results.stream()
|
||||||
|
.map(f -> safeGet(f))
|
||||||
|
.map(r -> r.elapsed)
|
||||||
|
.collect(Collectors.summarizingLong((l) -> l));
|
||||||
|
|
||||||
|
log.info("[I59] Tasks completed: #workers={}, #iterations={}, elapsed={}ms, stats={}",
|
||||||
|
workerCount,
|
||||||
|
iterations,
|
||||||
|
elapsed, summary);
|
||||||
|
|
||||||
|
return throughput(workerCount,iterations,elapsed);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new RuntimeException("[E61] Timeout waiting workers to complete");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch(Exception ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <T> T safeGet(Future<T> f) {
|
||||||
|
try {
|
||||||
|
return f.get();
|
||||||
|
}
|
||||||
|
catch(Exception ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static long throughput(int workerCount, int iterations, long elapsed) {
|
||||||
|
return (iterations*workerCount*1000)/elapsed;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,151 @@
|
||||||
|
package com.baeldung.benchmark;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.LongSummaryStatistics;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import com.rabbitmq.client.Connection;
|
||||||
|
import com.rabbitmq.client.ConnectionFactory;
|
||||||
|
|
||||||
|
public class SingleConnectionPublisherNio {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SingleConnectionPublisherNio.class);
|
||||||
|
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
if ( args.length != 4) {
|
||||||
|
System.err.println("Usage: java " + SingleConnectionPublisherNio.class.getName() + " <host> <#channels> <#messages> <payloadSize>");
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
ConnectionFactory factory = new ConnectionFactory();
|
||||||
|
factory.setHost(args[0]);
|
||||||
|
factory.useNio();
|
||||||
|
Connection connection = factory.newConnection();
|
||||||
|
|
||||||
|
List<Worker> workers = new ArrayList<>();
|
||||||
|
|
||||||
|
int workerCount = Integer.parseInt(args[1]);
|
||||||
|
int iterations = Integer.parseInt(args[2]);
|
||||||
|
int payloadSize = Integer.parseInt(args[3]);
|
||||||
|
|
||||||
|
log.info("[I35] Creating {} worker{}...", workerCount, (workerCount > 1)?"s":"");
|
||||||
|
|
||||||
|
CountDownLatch counter = new CountDownLatch(workerCount);
|
||||||
|
|
||||||
|
for( int i = 0 ; i < workerCount ; i++ ) {
|
||||||
|
workers.add(new Worker("queue_" + i, connection, iterations, counter,payloadSize));
|
||||||
|
}
|
||||||
|
|
||||||
|
ExecutorService executor = new ThreadPoolExecutor(workerCount, workerCount, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(workerCount, true));
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
log.info("[I61] Starting workers...");
|
||||||
|
List<Future<WorkerResult>> results = executor.invokeAll(workers);
|
||||||
|
|
||||||
|
log.info("[I55] Waiting workers to complete...");
|
||||||
|
if( counter.await(5, TimeUnit.MINUTES) ) {
|
||||||
|
long elapsed = System.currentTimeMillis() - start;
|
||||||
|
log.info("[I59] Tasks completed: #workers={}, #iterations={}, elapsed={}ms",
|
||||||
|
workerCount,
|
||||||
|
iterations,
|
||||||
|
elapsed);
|
||||||
|
|
||||||
|
LongSummaryStatistics summary = results.stream()
|
||||||
|
.map(f -> safeGet(f))
|
||||||
|
.map(r -> r.elapsed)
|
||||||
|
.collect(Collectors.summarizingLong((l) -> l));
|
||||||
|
|
||||||
|
log.info("[I74] stats={}", summary);
|
||||||
|
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
log.error("[E61] Timeout waiting workers to complete");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
catch(Exception ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <T> T safeGet(Future<T> f) {
|
||||||
|
try {
|
||||||
|
return f.get();
|
||||||
|
}
|
||||||
|
catch(Exception ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class WorkerResult {
|
||||||
|
public final long elapsed;
|
||||||
|
WorkerResult(long elapsed) {
|
||||||
|
this.elapsed = elapsed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static class Worker implements Callable<WorkerResult> {
|
||||||
|
|
||||||
|
private final Connection conn;
|
||||||
|
private final Channel channel;
|
||||||
|
private int iterations;
|
||||||
|
private final CountDownLatch counter;
|
||||||
|
private final String queue;
|
||||||
|
private final byte[] payload;
|
||||||
|
|
||||||
|
Worker(String queue, Connection conn, int iterations, CountDownLatch counter,int payloadSize) throws IOException {
|
||||||
|
this.conn = conn;
|
||||||
|
this.iterations = iterations;
|
||||||
|
this.counter = counter;
|
||||||
|
this.queue = queue;
|
||||||
|
|
||||||
|
channel = conn.createChannel();
|
||||||
|
channel.queueDeclare(queue, false, false, true, null);
|
||||||
|
|
||||||
|
this.payload = new byte[payloadSize];
|
||||||
|
new Random().nextBytes(payload);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WorkerResult call() throws Exception {
|
||||||
|
|
||||||
|
try {
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
for ( int i = 0 ; i < iterations ; i++ ) {
|
||||||
|
channel.basicPublish("", queue, null,payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
long elapsed = System.currentTimeMillis() - start;
|
||||||
|
channel.queueDelete(queue);
|
||||||
|
return new WorkerResult(elapsed);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
counter.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
package com.baeldung.benchmark;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import com.rabbitmq.client.Connection;
|
||||||
|
|
||||||
|
public class Worker implements Callable<Worker.WorkerResult> {
|
||||||
|
|
||||||
|
private final Channel channel;
|
||||||
|
private int iterations;
|
||||||
|
private final CountDownLatch counter;
|
||||||
|
private final String queue;
|
||||||
|
private final byte[] payload;
|
||||||
|
|
||||||
|
Worker(String queue, Connection conn, int iterations, CountDownLatch counter, int payloadSize) throws IOException {
|
||||||
|
this.iterations = iterations;
|
||||||
|
this.counter = counter;
|
||||||
|
this.queue = queue;
|
||||||
|
|
||||||
|
channel = conn.createChannel();
|
||||||
|
channel.queueDelete(queue);
|
||||||
|
channel.queueDeclare(queue, false, false, true, null);
|
||||||
|
|
||||||
|
this.payload = new byte[payloadSize];
|
||||||
|
new Random().nextBytes(payload);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WorkerResult call() throws Exception {
|
||||||
|
|
||||||
|
try {
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
for (int i = 0; i < iterations; i++) {
|
||||||
|
channel.basicPublish("", queue, null, payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
long elapsed = System.currentTimeMillis() - start;
|
||||||
|
channel.queueDelete(queue);
|
||||||
|
return new WorkerResult(elapsed);
|
||||||
|
} finally {
|
||||||
|
counter.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static class WorkerResult {
|
||||||
|
public final long elapsed;
|
||||||
|
|
||||||
|
WorkerResult(long elapsed) {
|
||||||
|
this.elapsed = elapsed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,2 @@
|
||||||
|
# Memory configuration for Rabbit
|
||||||
|
vm_memory_high_watermark.relative = 0.8
|
|
@ -0,0 +1,18 @@
|
||||||
|
package com.baeldung.benchmark;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
class ConnectionPerChannelPublisherLiveTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void whenConnectionPerChannel_thenRunBenchmark() throws Exception {
|
||||||
|
// host, workerCount, iterations, payloadSize
|
||||||
|
Arrays.asList(1,5,10,20,50,100,150).stream()
|
||||||
|
.forEach(workers -> {
|
||||||
|
ConnectionPerChannelPublisher.main(new String[]{"192.168.99.100", Integer.toString(workers), "1000", "4096"});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
package com.baeldung.benchmark;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
class SingleConnectionPublisherLiveTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void whenSingleChannel_thenRunBenchmark() throws Exception {
|
||||||
|
// host, workerCount, iterations, payloadSize
|
||||||
|
Arrays.asList(1,5,10,20,50,100,150).stream()
|
||||||
|
.forEach(workers -> {
|
||||||
|
SingleConnectionPublisher.main(new String[]{"192.168.99.100", Integer.toString(workers), "1000", "4096"});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -47,7 +47,7 @@
|
||||||
<module>spring-security-web-thymeleaf</module>
|
<module>spring-security-web-thymeleaf</module>
|
||||||
<module>spring-security-web-x509</module>
|
<module>spring-security-web-x509</module>
|
||||||
<module>spring-security-opa</module>
|
<module>spring-security-opa</module>
|
||||||
<module>spring-security-pkce</module>
|
<module>spring-security-pkce</module>
|
||||||
</modules>
|
</modules>
|
||||||
|
|
||||||
</project>
|
</project>
|
Loading…
Reference in New Issue