BAEL-1423 Java Concurrency Utility with JCTools Library
This commit is contained in:
parent
323f5bf370
commit
f4feab9212
|
@ -0,0 +1,11 @@
|
||||||
|
## Overview
|
||||||
|
|
||||||
|
This project holds a [couple of tests](./src/test/java/com/baeldung/jctools/JCToolsUnitTest.java) which illustrate JCTools specifics and a [benchmark](./src/main/java/com/baeldung/jctools/MpmcBenchmark.java) in the [JMH](http://openjdk.java.net/projects/code-tools/jmh/) format.
|
||||||
|
|
||||||
|
## How to build and run the JMH benchmark
|
||||||
|
|
||||||
|
Execute the following from the project's root:
|
||||||
|
```bash
|
||||||
|
mvn clean install
|
||||||
|
java -jar ./target/benchmarks.jar MpmcBenchmark -si true
|
||||||
|
```
|
|
@ -0,0 +1,108 @@
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<groupId>com.baeldung</groupId>
|
||||||
|
<artifactId>jctools</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
|
||||||
|
<name>jctools</name>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.jctools</groupId>
|
||||||
|
<artifactId>jctools-core</artifactId>
|
||||||
|
<version>${jctools.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.openjdk.jmh</groupId>
|
||||||
|
<artifactId>jmh-core</artifactId>
|
||||||
|
<version>${jmh.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.openjdk.jmh</groupId>
|
||||||
|
<artifactId>jmh-generator-annprocess</artifactId>
|
||||||
|
<version>${jmh.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>junit</groupId>
|
||||||
|
<artifactId>junit</artifactId>
|
||||||
|
<version>4.12</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.assertj</groupId>
|
||||||
|
<artifactId>assertj-core</artifactId>
|
||||||
|
<version>${assertj.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<jctools.version>2.1.2</jctools.version>
|
||||||
|
<assertj.version>3.9.1</assertj.version>
|
||||||
|
<jmh.version>1.20</jmh.version>
|
||||||
|
|
||||||
|
<javac.target>1.8</javac.target>
|
||||||
|
|
||||||
|
<uberjar.name>benchmarks</uberjar.name>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
|
<version>3.1</version>
|
||||||
|
<configuration>
|
||||||
|
<compilerVersion>${javac.target}</compilerVersion>
|
||||||
|
<source>${javac.target}</source>
|
||||||
|
<target>${javac.target}</target>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
|
<!-- Borrowed from the 'jmh-java-benchmark-archetype' pom.xml -->
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-shade-plugin</artifactId>
|
||||||
|
<version>2.2</version>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<phase>package</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>shade</goal>
|
||||||
|
</goals>
|
||||||
|
<configuration>
|
||||||
|
<finalName>${uberjar.name}</finalName>
|
||||||
|
<transformers>
|
||||||
|
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
||||||
|
<mainClass>org.openjdk.jmh.Main</mainClass>
|
||||||
|
</transformer>
|
||||||
|
</transformers>
|
||||||
|
<filters>
|
||||||
|
<filter>
|
||||||
|
<!--
|
||||||
|
Shading signed JARs will fail without this.
|
||||||
|
http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar
|
||||||
|
-->
|
||||||
|
<artifact>*:*</artifact>
|
||||||
|
<excludes>
|
||||||
|
<exclude>META-INF/*.SF</exclude>
|
||||||
|
<exclude>META-INF/*.DSA</exclude>
|
||||||
|
<exclude>META-INF/*.RSA</exclude>
|
||||||
|
</excludes>
|
||||||
|
</filter>
|
||||||
|
</filters>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
|
||||||
|
</project>
|
|
@ -0,0 +1,98 @@
|
||||||
|
package com.baeldung.jctools;
|
||||||
|
|
||||||
|
import org.jctools.queues.MpmcArrayQueue;
|
||||||
|
import org.jctools.queues.atomic.MpmcAtomicArrayQueue;
|
||||||
|
import org.openjdk.jmh.annotations.*;
|
||||||
|
import org.openjdk.jmh.infra.Control;
|
||||||
|
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@BenchmarkMode(Mode.SampleTime)
|
||||||
|
@OutputTimeUnit(TimeUnit.NANOSECONDS)
|
||||||
|
@Fork(1)
|
||||||
|
@Warmup(iterations = 1)
|
||||||
|
@Measurement(iterations = 3)
|
||||||
|
public class MpmcBenchmark {
|
||||||
|
|
||||||
|
public static final String GROUP_UNSAFE = "MpmcArrayQueue";
|
||||||
|
public static final String GROUP_AFU = "MpmcAtomicArrayQueue";
|
||||||
|
public static final String GROUP_JDK = "ArrayBlockingQueue";
|
||||||
|
|
||||||
|
public static final int PRODUCER_THREADS_NUMBER = 32;
|
||||||
|
public static final int CONSUMER_THREADS_NUMBER = 32;
|
||||||
|
|
||||||
|
public static final int CAPACITY = 128;
|
||||||
|
|
||||||
|
@State(Scope.Group)
|
||||||
|
public static class Mpmc {
|
||||||
|
public final Queue<Long> queue = new MpmcArrayQueue<>(CAPACITY);
|
||||||
|
}
|
||||||
|
|
||||||
|
@State(Scope.Group)
|
||||||
|
public static class MpmcAtomic {
|
||||||
|
public final Queue<Long> queue = new MpmcAtomicArrayQueue<>(CAPACITY);
|
||||||
|
}
|
||||||
|
|
||||||
|
@State(Scope.Group)
|
||||||
|
public static class Jdk {
|
||||||
|
public final Queue<Long> queue = new ArrayBlockingQueue<>(CAPACITY);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
@Group(GROUP_UNSAFE)
|
||||||
|
@GroupThreads(PRODUCER_THREADS_NUMBER)
|
||||||
|
public void mpmcWrite(Control control, Mpmc state) {
|
||||||
|
write(control, state.queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
@Group(GROUP_UNSAFE)
|
||||||
|
@GroupThreads(CONSUMER_THREADS_NUMBER)
|
||||||
|
public void mpmcRead(Control control, Mpmc state) {
|
||||||
|
read(control, state.queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
@Group(GROUP_AFU)
|
||||||
|
@GroupThreads(PRODUCER_THREADS_NUMBER)
|
||||||
|
public void mpmcAtomicWrite(Control control, MpmcAtomic state) {
|
||||||
|
write(control, state.queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
@Group(GROUP_AFU)
|
||||||
|
@GroupThreads(CONSUMER_THREADS_NUMBER)
|
||||||
|
public void mpmcAtomicRead(Control control, MpmcAtomic state) {
|
||||||
|
read(control, state.queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
@Group(GROUP_JDK)
|
||||||
|
@GroupThreads(PRODUCER_THREADS_NUMBER)
|
||||||
|
public void jdkWrite(Control control, Jdk state) {
|
||||||
|
write(control, state.queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
@Group(GROUP_JDK)
|
||||||
|
@GroupThreads(CONSUMER_THREADS_NUMBER)
|
||||||
|
public void jdkRead(Control control, Jdk state) {
|
||||||
|
read(control, state.queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void write(Control control, Queue<Long> queue) {
|
||||||
|
//noinspection StatementWithEmptyBody
|
||||||
|
while (!control.stopMeasurement && !queue.offer(1L)) {
|
||||||
|
// Is intentionally left blank
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void read(Control control, Queue<Long> queue) {
|
||||||
|
//noinspection StatementWithEmptyBody
|
||||||
|
while (!control.stopMeasurement && queue.poll() == null) {
|
||||||
|
// Is intentionally left blank
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,86 @@
|
||||||
|
package com.baeldung.jctools;
|
||||||
|
|
||||||
|
import org.jctools.queues.SpscArrayQueue;
|
||||||
|
import org.jctools.queues.SpscChunkedArrayQueue;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.function.IntConsumer;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.assertj.core.api.Assertions.fail;
|
||||||
|
|
||||||
|
public class JCToolsUnitTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenMultipleProducers_whenSpscQueueUsed_thenNoWarningOccurs() throws InterruptedException {
|
||||||
|
SpscArrayQueue<Integer> queue = new SpscArrayQueue<Integer>(2);
|
||||||
|
|
||||||
|
Thread producer1 = new Thread(() -> {
|
||||||
|
queue.offer(1);
|
||||||
|
});
|
||||||
|
producer1.start();
|
||||||
|
producer1.join();
|
||||||
|
|
||||||
|
Thread producer2 = new Thread(() -> {
|
||||||
|
queue.offer(2);
|
||||||
|
});
|
||||||
|
producer2.start();
|
||||||
|
producer2.join();
|
||||||
|
|
||||||
|
Set<Integer> fromQueue = new HashSet<>();
|
||||||
|
Thread consumer = new Thread(() -> queue.drain(fromQueue::add));
|
||||||
|
consumer.start();
|
||||||
|
consumer.join();
|
||||||
|
|
||||||
|
assertThat(fromQueue).containsOnly(1, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenQueueIsFull_thenNoMoreElementsCanBeAdded() throws InterruptedException {
|
||||||
|
SpscChunkedArrayQueue<Integer> queue = new SpscChunkedArrayQueue<>(8, 16);
|
||||||
|
assertThat(queue.capacity()).isEqualTo(16);
|
||||||
|
|
||||||
|
CountDownLatch startConsuming = new CountDownLatch(1);
|
||||||
|
CountDownLatch awakeProducer = new CountDownLatch(1);
|
||||||
|
AtomicReference<Throwable> error = new AtomicReference<>();
|
||||||
|
Thread producer = new Thread(() -> {
|
||||||
|
IntStream.range(0, queue.capacity()).forEach(i -> {
|
||||||
|
assertThat(queue.offer(i)).isTrue();
|
||||||
|
});
|
||||||
|
assertThat(queue.offer(queue.capacity())).isFalse();
|
||||||
|
startConsuming.countDown();
|
||||||
|
try {
|
||||||
|
awakeProducer.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(queue.offer(queue.capacity())).isTrue();
|
||||||
|
});
|
||||||
|
producer.setUncaughtExceptionHandler((t, e) -> {
|
||||||
|
error.set(e);
|
||||||
|
startConsuming.countDown();
|
||||||
|
});
|
||||||
|
producer.start();
|
||||||
|
|
||||||
|
startConsuming.await();
|
||||||
|
|
||||||
|
if (error.get() != null) {
|
||||||
|
fail("Producer's assertion failed", error.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<Integer> fromQueue = new HashSet<>();
|
||||||
|
queue.drain(fromQueue::add);
|
||||||
|
awakeProducer.countDown();
|
||||||
|
producer.join();
|
||||||
|
queue.drain(fromQueue::add);
|
||||||
|
|
||||||
|
assertThat(fromQueue).containsAll(IntStream.range(0, 17).boxed().collect(Collectors.toSet()));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue