From f4feab9212a4bce79c79530960a139eced155fd9 Mon Sep 17 00:00:00 2001 From: Denis Zhdanov Date: Mon, 2 Apr 2018 00:36:53 +0300 Subject: [PATCH] BAEL-1423 Java Concurrency Utility with JCTools Library --- jctools/README.md | 11 ++ jctools/pom.xml | 108 ++++++++++++++++++ .../com/baeldung/jctools/MpmcBenchmark.java | 98 ++++++++++++++++ .../com/baeldung/jctools/JCToolsUnitTest.java | 86 ++++++++++++++ 4 files changed, 303 insertions(+) create mode 100644 jctools/README.md create mode 100644 jctools/pom.xml create mode 100644 jctools/src/main/java/com/baeldung/jctools/MpmcBenchmark.java create mode 100644 jctools/src/test/java/com/baeldung/jctools/JCToolsUnitTest.java diff --git a/jctools/README.md b/jctools/README.md new file mode 100644 index 0000000000..181e45c1ca --- /dev/null +++ b/jctools/README.md @@ -0,0 +1,11 @@ +## Overview + +This project holds a [couple of tests](./src/test/java/com/baeldung/jctools/JCToolsUnitTest.java) which illustrate JCTools specifics and a [benchmark](./src/main/java/com/baeldung/jctools/MpmcBenchmark.java) in the [JMH](http://openjdk.java.net/projects/code-tools/jmh/) format. + +## How to build and run the JMH benchmark + +Execute the following from the project's root: +```bash +mvn clean install +java -jar ./target/benchmarks.jar MpmcBenchmark -si true +``` \ No newline at end of file diff --git a/jctools/pom.xml b/jctools/pom.xml new file mode 100644 index 0000000000..8a21c610de --- /dev/null +++ b/jctools/pom.xml @@ -0,0 +1,108 @@ + + 4.0.0 + + com.baeldung + jctools + 0.0.1-SNAPSHOT + + jctools + + + + org.jctools + jctools-core + ${jctools.version} + + + + org.openjdk.jmh + jmh-core + ${jmh.version} + + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + provided + + + + junit + junit + 4.12 + test + + + + org.assertj + assertj-core + ${assertj.version} + test + + + + + 2.1.2 + 3.9.1 + 1.20 + + 1.8 + + benchmarks + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${javac.target} + ${javac.target} + ${javac.target} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.2 + + + package + + shade + + + ${uberjar.name} + + + org.openjdk.jmh.Main + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + + \ No newline at end of file diff --git a/jctools/src/main/java/com/baeldung/jctools/MpmcBenchmark.java b/jctools/src/main/java/com/baeldung/jctools/MpmcBenchmark.java new file mode 100644 index 0000000000..0e543fa609 --- /dev/null +++ b/jctools/src/main/java/com/baeldung/jctools/MpmcBenchmark.java @@ -0,0 +1,98 @@ +package com.baeldung.jctools; + +import org.jctools.queues.MpmcArrayQueue; +import org.jctools.queues.atomic.MpmcAtomicArrayQueue; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Control; + +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.SampleTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Fork(1) +@Warmup(iterations = 1) +@Measurement(iterations = 3) +public class MpmcBenchmark { + + public static final String GROUP_UNSAFE = "MpmcArrayQueue"; + public static final String GROUP_AFU = "MpmcAtomicArrayQueue"; + public static final String GROUP_JDK = "ArrayBlockingQueue"; + + public static final int PRODUCER_THREADS_NUMBER = 32; + public static final int CONSUMER_THREADS_NUMBER = 32; + + public static final int CAPACITY = 128; + + @State(Scope.Group) + public static class Mpmc { + public final Queue queue = new MpmcArrayQueue<>(CAPACITY); + } + + @State(Scope.Group) + public static class MpmcAtomic { + public final Queue queue = new MpmcAtomicArrayQueue<>(CAPACITY); + } + + @State(Scope.Group) + public static class Jdk { + public final Queue queue = new ArrayBlockingQueue<>(CAPACITY); + } + + @Benchmark + @Group(GROUP_UNSAFE) + @GroupThreads(PRODUCER_THREADS_NUMBER) + public void mpmcWrite(Control control, Mpmc state) { + write(control, state.queue); + } + + @Benchmark + @Group(GROUP_UNSAFE) + @GroupThreads(CONSUMER_THREADS_NUMBER) + public void mpmcRead(Control control, Mpmc state) { + read(control, state.queue); + } + + @Benchmark + @Group(GROUP_AFU) + @GroupThreads(PRODUCER_THREADS_NUMBER) + public void mpmcAtomicWrite(Control control, MpmcAtomic state) { + write(control, state.queue); + } + + @Benchmark + @Group(GROUP_AFU) + @GroupThreads(CONSUMER_THREADS_NUMBER) + public void mpmcAtomicRead(Control control, MpmcAtomic state) { + read(control, state.queue); + } + + @Benchmark + @Group(GROUP_JDK) + @GroupThreads(PRODUCER_THREADS_NUMBER) + public void jdkWrite(Control control, Jdk state) { + write(control, state.queue); + } + + @Benchmark + @Group(GROUP_JDK) + @GroupThreads(CONSUMER_THREADS_NUMBER) + public void jdkRead(Control control, Jdk state) { + read(control, state.queue); + } + + private void write(Control control, Queue queue) { + //noinspection StatementWithEmptyBody + while (!control.stopMeasurement && !queue.offer(1L)) { + // Is intentionally left blank + } + } + + private void read(Control control, Queue queue) { + //noinspection StatementWithEmptyBody + while (!control.stopMeasurement && queue.poll() == null) { + // Is intentionally left blank + } + } +} diff --git a/jctools/src/test/java/com/baeldung/jctools/JCToolsUnitTest.java b/jctools/src/test/java/com/baeldung/jctools/JCToolsUnitTest.java new file mode 100644 index 0000000000..4a9d0fadb2 --- /dev/null +++ b/jctools/src/test/java/com/baeldung/jctools/JCToolsUnitTest.java @@ -0,0 +1,86 @@ +package com.baeldung.jctools; + +import org.jctools.queues.SpscArrayQueue; +import org.jctools.queues.SpscChunkedArrayQueue; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.IntConsumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +public class JCToolsUnitTest { + + @Test + public void givenMultipleProducers_whenSpscQueueUsed_thenNoWarningOccurs() throws InterruptedException { + SpscArrayQueue queue = new SpscArrayQueue(2); + + Thread producer1 = new Thread(() -> { + queue.offer(1); + }); + producer1.start(); + producer1.join(); + + Thread producer2 = new Thread(() -> { + queue.offer(2); + }); + producer2.start(); + producer2.join(); + + Set fromQueue = new HashSet<>(); + Thread consumer = new Thread(() -> queue.drain(fromQueue::add)); + consumer.start(); + consumer.join(); + + assertThat(fromQueue).containsOnly(1, 2); + } + + @Test + public void whenQueueIsFull_thenNoMoreElementsCanBeAdded() throws InterruptedException { + SpscChunkedArrayQueue queue = new SpscChunkedArrayQueue<>(8, 16); + assertThat(queue.capacity()).isEqualTo(16); + + CountDownLatch startConsuming = new CountDownLatch(1); + CountDownLatch awakeProducer = new CountDownLatch(1); + AtomicReference error = new AtomicReference<>(); + Thread producer = new Thread(() -> { + IntStream.range(0, queue.capacity()).forEach(i -> { + assertThat(queue.offer(i)).isTrue(); + }); + assertThat(queue.offer(queue.capacity())).isFalse(); + startConsuming.countDown(); + try { + awakeProducer.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + assertThat(queue.offer(queue.capacity())).isTrue(); + }); + producer.setUncaughtExceptionHandler((t, e) -> { + error.set(e); + startConsuming.countDown(); + }); + producer.start(); + + startConsuming.await(); + + if (error.get() != null) { + fail("Producer's assertion failed", error.get()); + } + + Set fromQueue = new HashSet<>(); + queue.drain(fromQueue::add); + awakeProducer.countDown(); + producer.join(); + queue.drain(fromQueue::add); + + assertThat(fromQueue).containsAll(IntStream.range(0, 17).boxed().collect(Collectors.toSet())); + } +}