Merge pull request #3921 from denis-zhdanov/BAEL-1423-jctools
BAEL-1423 Java Concurrency Utility with JCTools Library
This commit is contained in:
commit
fa74f05d60
@ -686,6 +686,12 @@
|
||||
<artifactId>fugue</artifactId>
|
||||
<version>4.5.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.jctools</groupId>
|
||||
<artifactId>jctools-core</artifactId>
|
||||
<version>${jctools.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<repositories>
|
||||
@ -785,6 +791,52 @@
|
||||
</archive>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<!-- /Neuroph -->
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.7.0</version>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<!-- Borrowed from the 'jmh-java-benchmark-archetype' pom.xml -->
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>2.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<finalName>benchmarks</finalName>
|
||||
<transformers>
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
||||
<mainClass>org.openjdk.jmh.Main</mainClass>
|
||||
</transformer>
|
||||
</transformers>
|
||||
<filters>
|
||||
<filter>
|
||||
<!--
|
||||
Shading signed JARs will fail without this.
|
||||
http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar
|
||||
-->
|
||||
<artifact>*:*</artifact>
|
||||
<excludes>
|
||||
<exclude>META-INF/*.SF</exclude>
|
||||
<exclude>META-INF/*.DSA</exclude>
|
||||
<exclude>META-INF/*.RSA</exclude>
|
||||
</excludes>
|
||||
</filter>
|
||||
</filters>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
@ -856,6 +908,7 @@
|
||||
<infinispan.version>9.1.5.Final</infinispan.version>
|
||||
<opencsv.version>4.1</opencsv.version>
|
||||
<unirest.version>1.4.9</unirest.version>
|
||||
<jctools.version>2.1.2</jctools.version>
|
||||
<commons-codec-version>1.10.L001</commons-codec-version>
|
||||
<jets3t-version>0.9.4.0006L</jets3t-version>
|
||||
</properties>
|
||||
|
@ -0,0 +1,73 @@
|
||||
package com.baeldung.jctools;
|
||||
|
||||
import org.jctools.queues.MpmcArrayQueue;
|
||||
import org.jctools.queues.atomic.MpmcAtomicArrayQueue;
|
||||
import org.openjdk.jmh.annotations.*;
|
||||
import org.openjdk.jmh.infra.Control;
|
||||
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@BenchmarkMode(Mode.SampleTime)
|
||||
@OutputTimeUnit(TimeUnit.NANOSECONDS)
|
||||
@Fork(1)
|
||||
@Warmup(iterations = 1)
|
||||
@Measurement(iterations = 3)
|
||||
@State(Scope.Group)
|
||||
public class MpmcBenchmark {
|
||||
|
||||
public static final String PARAM_UNSAFE = "MpmcArrayQueue";
|
||||
public static final String PARAM_AFU = "MpmcAtomicArrayQueue";
|
||||
public static final String PARAM_JDK = "ArrayBlockingQueue";
|
||||
|
||||
public static final int PRODUCER_THREADS_NUMBER = 32;
|
||||
public static final int CONSUMER_THREADS_NUMBER = 32;
|
||||
|
||||
public static final String GROUP_NAME = "MyGroup";
|
||||
|
||||
public static final int CAPACITY = 128;
|
||||
|
||||
@Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK})
|
||||
public volatile String implementation;
|
||||
|
||||
public volatile Queue<Long> queue;
|
||||
|
||||
@Setup(Level.Trial)
|
||||
public void setUp() {
|
||||
switch (implementation) {
|
||||
case PARAM_UNSAFE:
|
||||
queue = new MpmcArrayQueue<>(CAPACITY);
|
||||
break;
|
||||
case PARAM_AFU:
|
||||
queue = new MpmcAtomicArrayQueue<>(CAPACITY);
|
||||
break;
|
||||
case PARAM_JDK:
|
||||
queue = new ArrayBlockingQueue<>(CAPACITY);
|
||||
break;
|
||||
default:
|
||||
throw new UnsupportedOperationException("Unsupported implementation " + implementation);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Benchmark
|
||||
@Group(GROUP_NAME)
|
||||
@GroupThreads(PRODUCER_THREADS_NUMBER)
|
||||
public void write(Control control) {
|
||||
//noinspection StatementWithEmptyBody
|
||||
while (!control.stopMeasurement && !queue.offer(1L)) {
|
||||
// Is intentionally left blank
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Group(GROUP_NAME)
|
||||
@GroupThreads(CONSUMER_THREADS_NUMBER)
|
||||
public void read(Control control) {
|
||||
//noinspection StatementWithEmptyBody
|
||||
while (!control.stopMeasurement && queue.poll() == null) {
|
||||
// Is intentionally left blank
|
||||
}
|
||||
}
|
||||
}
|
7
libraries/src/main/java/com/baeldung/jctools/README.md
Normal file
7
libraries/src/main/java/com/baeldung/jctools/README.md
Normal file
@ -0,0 +1,7 @@
|
||||
## How to build and run the JMH benchmark
|
||||
|
||||
Execute the following from the project's root:
|
||||
```bash
|
||||
mvn clean install
|
||||
java -jar ./target/benchmarks.jar MpmcBenchmark -si true
|
||||
```
|
@ -0,0 +1,86 @@
|
||||
package com.baeldung.jctools;
|
||||
|
||||
import org.jctools.queues.SpscArrayQueue;
|
||||
import org.jctools.queues.SpscChunkedArrayQueue;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.IntConsumer;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.fail;
|
||||
|
||||
public class JCToolsUnitTest {
|
||||
|
||||
@Test
|
||||
public void givenMultipleProducers_whenSpscQueueUsed_thenNoWarningOccurs() throws InterruptedException {
|
||||
SpscArrayQueue<Integer> queue = new SpscArrayQueue<Integer>(2);
|
||||
|
||||
Thread producer1 = new Thread(() -> {
|
||||
queue.offer(1);
|
||||
});
|
||||
producer1.start();
|
||||
producer1.join();
|
||||
|
||||
Thread producer2 = new Thread(() -> {
|
||||
queue.offer(2);
|
||||
});
|
||||
producer2.start();
|
||||
producer2.join();
|
||||
|
||||
Set<Integer> fromQueue = new HashSet<>();
|
||||
Thread consumer = new Thread(() -> queue.drain(fromQueue::add));
|
||||
consumer.start();
|
||||
consumer.join();
|
||||
|
||||
assertThat(fromQueue).containsOnly(1, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenQueueIsFull_thenNoMoreElementsCanBeAdded() throws InterruptedException {
|
||||
SpscChunkedArrayQueue<Integer> queue = new SpscChunkedArrayQueue<>(8, 16);
|
||||
assertThat(queue.capacity()).isEqualTo(16);
|
||||
|
||||
CountDownLatch startConsuming = new CountDownLatch(1);
|
||||
CountDownLatch awakeProducer = new CountDownLatch(1);
|
||||
AtomicReference<Throwable> error = new AtomicReference<>();
|
||||
Thread producer = new Thread(() -> {
|
||||
IntStream.range(0, queue.capacity()).forEach(i -> {
|
||||
assertThat(queue.offer(i)).isTrue();
|
||||
});
|
||||
assertThat(queue.offer(queue.capacity())).isFalse();
|
||||
startConsuming.countDown();
|
||||
try {
|
||||
awakeProducer.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
assertThat(queue.offer(queue.capacity())).isTrue();
|
||||
});
|
||||
producer.setUncaughtExceptionHandler((t, e) -> {
|
||||
error.set(e);
|
||||
startConsuming.countDown();
|
||||
});
|
||||
producer.start();
|
||||
|
||||
startConsuming.await();
|
||||
|
||||
if (error.get() != null) {
|
||||
fail("Producer's assertion failed", error.get());
|
||||
}
|
||||
|
||||
Set<Integer> fromQueue = new HashSet<>();
|
||||
queue.drain(fromQueue::add);
|
||||
awakeProducer.countDown();
|
||||
producer.join();
|
||||
queue.drain(fromQueue::add);
|
||||
|
||||
assertThat(fromQueue).containsAll(IntStream.range(0, 17).boxed().collect(Collectors.toSet()));
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user