From a8d61e77ddb05e262e923a95567c428d0b9600b9 Mon Sep 17 00:00:00 2001 From: Muhammed Almas Date: Mon, 16 Jan 2017 00:18:42 +0530 Subject: [PATCH] BAEL-519 Introduction to LMAX Disruptor (#948) * BAL-36 File size api in java and apache commons IO * BAEL-282 grep in java - fixes after code review * BAEL-519 Added support for disruptor library * BAEL-519 Added support for disruptor library * BAEL-519 Added support for disruptor library * BAEL-519 Added support for disruptor library * BAEL-519 Added support for disruptor library * BAEL-519 Added support for disruptor library * BAEL-519 Added support for disruptor * BAEL-519 Moved all supporting classes to main source * BAEL-519 Moved all supporting classes to main source * BAEL-519 Moved asserts and test classes in test folder. * BAEL-519 moved test related producer and consumer to src. * BAEL-586 Guide to Guava BiMap. * BAEL-587 formatted code. * BAEL-519 LMAX Disruptor * BAEL-587 resolved merge * BAEL-587 Resolved merge * BAEL-519 Removed disruptor link. * BAEL-519 Reverted Guava changes --- core-java/README.md | 2 +- core-java/pom.xml | 7 ++ .../disruptor/DelayedMultiEventProducer.java | 34 ++++++++ .../com/baeldung/disruptor/EventConsumer.java | 13 +++ .../com/baeldung/disruptor/EventProducer.java | 15 ++++ .../disruptor/MultiEventPrintConsumer.java | 23 +++++ .../disruptor/SingleEventPrintConsumer.java | 22 +++++ .../disruptor/SingleEventProducer.java | 22 +++++ .../com/baeldung/disruptor/ValueEvent.java | 25 ++++++ .../com/baeldung/disruptor/DisruptorTest.java | 83 +++++++++++++++++++ .../disruptor/MultiEventConsumer.java | 27 ++++++ .../disruptor/SingleEventConsumer.java | 21 +++++ 12 files changed, 293 insertions(+), 1 deletion(-) create mode 100644 core-java/src/main/java/com/baeldung/disruptor/DelayedMultiEventProducer.java create mode 100644 core-java/src/main/java/com/baeldung/disruptor/EventConsumer.java create mode 100644 core-java/src/main/java/com/baeldung/disruptor/EventProducer.java create mode 100644 core-java/src/main/java/com/baeldung/disruptor/MultiEventPrintConsumer.java create mode 100644 core-java/src/main/java/com/baeldung/disruptor/SingleEventPrintConsumer.java create mode 100644 core-java/src/main/java/com/baeldung/disruptor/SingleEventProducer.java create mode 100644 core-java/src/main/java/com/baeldung/disruptor/ValueEvent.java create mode 100644 core-java/src/test/java/com/baeldung/disruptor/DisruptorTest.java create mode 100644 core-java/src/test/java/com/baeldung/disruptor/MultiEventConsumer.java create mode 100644 core-java/src/test/java/com/baeldung/disruptor/SingleEventConsumer.java diff --git a/core-java/README.md b/core-java/README.md index 16cac81526..015e18cec1 100644 --- a/core-java/README.md +++ b/core-java/README.md @@ -52,4 +52,4 @@ - [URL Encoding and Decoding in Java](http://www.baeldung.com/java-url-encoding-decoding) - [Calculate the Size of a File in Java](http://www.baeldung.com/java-file-size) - [The Basics of Java Generics](http://www.baeldung.com/java-generics) -- [The Traveling Salesman Problem in Java](http://www.baeldung.com/java-simulated-annealing-for-traveling-salesman) +- [The Traveling Salesman Problem in Java](http://www.baeldung.com/java-simulated-annealing-for-traveling-salesman) \ No newline at end of file diff --git a/core-java/pom.xml b/core-java/pom.xml index 6979d980b7..5766224b3a 100644 --- a/core-java/pom.xml +++ b/core-java/pom.xml @@ -63,6 +63,12 @@ grep4j ${grep4j.version} + + + com.lmax + disruptor + ${disruptor.version} + @@ -363,6 +369,7 @@ 0.4 1.8.7 1.16.12 + 3.3.6 1.3 diff --git a/core-java/src/main/java/com/baeldung/disruptor/DelayedMultiEventProducer.java b/core-java/src/main/java/com/baeldung/disruptor/DelayedMultiEventProducer.java new file mode 100644 index 0000000000..bb9f91b99f --- /dev/null +++ b/core-java/src/main/java/com/baeldung/disruptor/DelayedMultiEventProducer.java @@ -0,0 +1,34 @@ +package com.baeldung.disruptor; + +import com.lmax.disruptor.RingBuffer; + +public class DelayedMultiEventProducer implements EventProducer { + + @Override + public void startProducing(final RingBuffer ringBuffer, final int count) { + final Runnable simpleProducer = () -> produce(ringBuffer, count, false); + final Runnable delayedProducer = () -> produce(ringBuffer, count, true); + new Thread(simpleProducer).start(); + new Thread(delayedProducer).start(); + } + + private void produce(final RingBuffer ringBuffer, final int count, final boolean addDelay) { + for (int i = 0; i < count; i++) { + final long seq = ringBuffer.next(); + final ValueEvent valueEvent = ringBuffer.get(seq); + valueEvent.setValue(i); + ringBuffer.publish(seq); + if (addDelay) { + addDelay(); + } + } + } + + private void addDelay() { + try { + Thread.sleep(1000); + } catch (InterruptedException interruptedException) { + // No-Op lets swallow it + } + } +} diff --git a/core-java/src/main/java/com/baeldung/disruptor/EventConsumer.java b/core-java/src/main/java/com/baeldung/disruptor/EventConsumer.java new file mode 100644 index 0000000000..20d72ff9be --- /dev/null +++ b/core-java/src/main/java/com/baeldung/disruptor/EventConsumer.java @@ -0,0 +1,13 @@ +package com.baeldung.disruptor; + +import com.lmax.disruptor.EventHandler; + +/** + * Consumer that consumes event from ring buffer. + */ +public interface EventConsumer { + /** + * One or more event handler to handle event from ring buffer. + */ + public EventHandler[] getEventHandler(); +} diff --git a/core-java/src/main/java/com/baeldung/disruptor/EventProducer.java b/core-java/src/main/java/com/baeldung/disruptor/EventProducer.java new file mode 100644 index 0000000000..ede1aef8bf --- /dev/null +++ b/core-java/src/main/java/com/baeldung/disruptor/EventProducer.java @@ -0,0 +1,15 @@ +package com.baeldung.disruptor; + +import com.lmax.disruptor.RingBuffer; + +/** + * Producer that produces event for ring buffer. + */ +public interface EventProducer { + /** + * Start the producer that would start producing the values. + * @param ringBuffer + * @param count + */ + public void startProducing(final RingBuffer ringBuffer, final int count); +} diff --git a/core-java/src/main/java/com/baeldung/disruptor/MultiEventPrintConsumer.java b/core-java/src/main/java/com/baeldung/disruptor/MultiEventPrintConsumer.java new file mode 100644 index 0000000000..67901db7d5 --- /dev/null +++ b/core-java/src/main/java/com/baeldung/disruptor/MultiEventPrintConsumer.java @@ -0,0 +1,23 @@ +package com.baeldung.disruptor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.lmax.disruptor.EventHandler; + +public class MultiEventPrintConsumer implements EventConsumer { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Override + @SuppressWarnings("unchecked") + public EventHandler[] getEventHandler() { + final EventHandler eventHandler = (event, sequence, endOfBatch) -> print(event.getValue(), sequence); + final EventHandler otherEventHandler = (event, sequence, endOfBatch) -> print(event.getValue(), sequence); + return new EventHandler[] { eventHandler, otherEventHandler }; + } + + private void print(final int id, final long sequenceId) { + logger.info("Id is " + id + " sequence id that was used is " + sequenceId); + } +} diff --git a/core-java/src/main/java/com/baeldung/disruptor/SingleEventPrintConsumer.java b/core-java/src/main/java/com/baeldung/disruptor/SingleEventPrintConsumer.java new file mode 100644 index 0000000000..766cbcf659 --- /dev/null +++ b/core-java/src/main/java/com/baeldung/disruptor/SingleEventPrintConsumer.java @@ -0,0 +1,22 @@ +package com.baeldung.disruptor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.lmax.disruptor.EventHandler; + +public class SingleEventPrintConsumer implements EventConsumer { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Override + @SuppressWarnings("unchecked") + public EventHandler[] getEventHandler() { + final EventHandler eventHandler = (event, sequence, endOfBatch) -> print(event.getValue(), sequence); + return new EventHandler[] { eventHandler }; + } + + private void print(final int id, final long sequenceId) { + logger.info("Id is " + id + " sequence id that was used is " + sequenceId); + } +} diff --git a/core-java/src/main/java/com/baeldung/disruptor/SingleEventProducer.java b/core-java/src/main/java/com/baeldung/disruptor/SingleEventProducer.java new file mode 100644 index 0000000000..771d1ab90d --- /dev/null +++ b/core-java/src/main/java/com/baeldung/disruptor/SingleEventProducer.java @@ -0,0 +1,22 @@ +package com.baeldung.disruptor; + +import com.lmax.disruptor.RingBuffer; + +public class SingleEventProducer implements EventProducer { + + @Override + public void startProducing(RingBuffer ringBuffer, int count) { + final Runnable producer = () -> produce(ringBuffer, count); + new Thread(producer).start(); + } + + private void produce(final RingBuffer ringBuffer, final int count) { + for (int i = 0; i < count; i++) { + final long seq = ringBuffer.next(); + final ValueEvent valueEvent = ringBuffer.get(seq); + valueEvent.setValue(i); + ringBuffer.publish(seq); + } + } + +} diff --git a/core-java/src/main/java/com/baeldung/disruptor/ValueEvent.java b/core-java/src/main/java/com/baeldung/disruptor/ValueEvent.java new file mode 100644 index 0000000000..ad466b6a31 --- /dev/null +++ b/core-java/src/main/java/com/baeldung/disruptor/ValueEvent.java @@ -0,0 +1,25 @@ +package com.baeldung.disruptor; + +import org.apache.commons.lang3.builder.ToStringBuilder; + +import com.lmax.disruptor.EventFactory; + +public final class ValueEvent { + + private int value; + + public int getValue() { + return value; + } + + public void setValue(int value) { + this.value = value; + } + + public final static EventFactory EVENT_FACTORY = () -> new ValueEvent(); + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } +} diff --git a/core-java/src/test/java/com/baeldung/disruptor/DisruptorTest.java b/core-java/src/test/java/com/baeldung/disruptor/DisruptorTest.java new file mode 100644 index 0000000000..28a5ff72ce --- /dev/null +++ b/core-java/src/test/java/com/baeldung/disruptor/DisruptorTest.java @@ -0,0 +1,83 @@ +package com.baeldung.disruptor; + +import java.util.concurrent.ThreadFactory; +import org.junit.Before; +import org.junit.Test; +import com.lmax.disruptor.BusySpinWaitStrategy; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.WaitStrategy; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import com.lmax.disruptor.util.DaemonThreadFactory; + +public class DisruptorTest { + private Disruptor disruptor; + private WaitStrategy waitStrategy; + + @Before + public void setUp() throws Exception { + waitStrategy = new BusySpinWaitStrategy(); + } + + private void createDisruptor(final ProducerType producerType, final EventConsumer eventConsumer) { + final ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE; + disruptor = new Disruptor(ValueEvent.EVENT_FACTORY, 16, threadFactory, producerType, waitStrategy); + disruptor.handleEventsWith(eventConsumer.getEventHandler()); + } + + private void startProducing(final RingBuffer ringBuffer, final int count, final EventProducer eventProducer) { + eventProducer.startProducing(ringBuffer, count); + } + + @Test + public void whenMultipleProducerSingleConsumer_thenOutputInFifoOrder() { + final EventConsumer eventConsumer = new SingleEventPrintConsumer(); + final EventProducer eventProducer = new DelayedMultiEventProducer(); + createDisruptor(ProducerType.MULTI, eventConsumer); + final RingBuffer ringBuffer = disruptor.start(); + + startProducing(ringBuffer, 32, eventProducer); + + disruptor.halt(); + disruptor.shutdown(); + } + + @Test + public void whenSingleProducerSingleConsumer_thenOutputInFifoOrder() { + final EventConsumer eventConsumer = new SingleEventConsumer(); + final EventProducer eventProducer = new SingleEventProducer(); + createDisruptor(ProducerType.SINGLE, eventConsumer); + final RingBuffer ringBuffer = disruptor.start(); + + startProducing(ringBuffer, 32, eventProducer); + + disruptor.halt(); + disruptor.shutdown(); + } + + @Test + public void whenSingleProducerMultipleConsumer_thenOutputInFifoOrder() { + final EventConsumer eventConsumer = new MultiEventConsumer(); + final EventProducer eventProducer = new SingleEventProducer(); + createDisruptor(ProducerType.SINGLE, eventConsumer); + final RingBuffer ringBuffer = disruptor.start(); + + startProducing(ringBuffer, 32, eventProducer); + + disruptor.halt(); + disruptor.shutdown(); + } + + @Test + public void whenMultipleProducerMultipleConsumer_thenOutputInFifoOrder() { + final EventConsumer eventConsumer = new MultiEventPrintConsumer(); + final EventProducer eventProducer = new DelayedMultiEventProducer(); + createDisruptor(ProducerType.MULTI, eventConsumer); + final RingBuffer ringBuffer = disruptor.start(); + + startProducing(ringBuffer, 32, eventProducer); + + disruptor.halt(); + disruptor.shutdown(); + } +} diff --git a/core-java/src/test/java/com/baeldung/disruptor/MultiEventConsumer.java b/core-java/src/test/java/com/baeldung/disruptor/MultiEventConsumer.java new file mode 100644 index 0000000000..304f62c2ee --- /dev/null +++ b/core-java/src/test/java/com/baeldung/disruptor/MultiEventConsumer.java @@ -0,0 +1,27 @@ +package com.baeldung.disruptor; + +import static org.junit.Assert.assertEquals; + +import com.lmax.disruptor.EventHandler; + +public class MultiEventConsumer implements EventConsumer { + + private int expectedValue = -1; + private int otherExpectedValue = -1; + + @Override + @SuppressWarnings("unchecked") + public EventHandler[] getEventHandler() { + final EventHandler eventHandler = (event, sequence, endOfBatch) -> assertExpectedValue(event.getValue()); + final EventHandler otherEventHandler = (event, sequence, endOfBatch) -> assertOtherExpectedValue(event.getValue()); + return new EventHandler[] { eventHandler, otherEventHandler }; + } + + private void assertExpectedValue(final int id) { + assertEquals(++expectedValue, id); + } + + private void assertOtherExpectedValue(final int id) { + assertEquals(++otherExpectedValue, id); + } +} diff --git a/core-java/src/test/java/com/baeldung/disruptor/SingleEventConsumer.java b/core-java/src/test/java/com/baeldung/disruptor/SingleEventConsumer.java new file mode 100644 index 0000000000..efc7432b5a --- /dev/null +++ b/core-java/src/test/java/com/baeldung/disruptor/SingleEventConsumer.java @@ -0,0 +1,21 @@ +package com.baeldung.disruptor; + +import static org.junit.Assert.assertEquals; + +import com.lmax.disruptor.EventHandler; + +public class SingleEventConsumer implements EventConsumer { + + private int expectedValue = -1; + + @Override + @SuppressWarnings("unchecked") + public EventHandler[] getEventHandler() { + final EventHandler eventHandler = (event, sequence, endOfBatch) -> assertExpectedValue(event.getValue()); + return new EventHandler[] { eventHandler }; + } + + private void assertExpectedValue(final int id) { + assertEquals(++expectedValue, id); + } +}