BAEL-519 Introduction to LMAX Disruptor (#948)
* BAL-36 File size api in java and apache commons IO * BAEL-282 grep in java - fixes after code review * BAEL-519 Added support for disruptor library * BAEL-519 Added support for disruptor library * BAEL-519 Added support for disruptor library * BAEL-519 Added support for disruptor library * BAEL-519 Added support for disruptor library * BAEL-519 Added support for disruptor library * BAEL-519 Added support for disruptor * BAEL-519 Moved all supporting classes to main source * BAEL-519 Moved all supporting classes to main source * BAEL-519 Moved asserts and test classes in test folder. * BAEL-519 moved test related producer and consumer to src. * BAEL-586 Guide to Guava BiMap. * BAEL-587 formatted code. * BAEL-519 LMAX Disruptor * BAEL-587 resolved merge * BAEL-587 Resolved merge * BAEL-519 Removed disruptor link. * BAEL-519 Reverted Guava changes
This commit is contained in:
parent
d8af6f371b
commit
a8d61e77dd
|
@ -63,6 +63,12 @@
|
||||||
<artifactId>grep4j</artifactId>
|
<artifactId>grep4j</artifactId>
|
||||||
<version>${grep4j.version}</version>
|
<version>${grep4j.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.lmax</groupId>
|
||||||
|
<artifactId>disruptor</artifactId>
|
||||||
|
<version>${disruptor.version}</version>
|
||||||
|
</dependency>
|
||||||
<!-- web -->
|
<!-- web -->
|
||||||
|
|
||||||
<!-- marshalling -->
|
<!-- marshalling -->
|
||||||
|
@ -363,6 +369,7 @@
|
||||||
<unix4j.version>0.4</unix4j.version>
|
<unix4j.version>0.4</unix4j.version>
|
||||||
<grep4j.version>1.8.7</grep4j.version>
|
<grep4j.version>1.8.7</grep4j.version>
|
||||||
<lombok.version>1.16.12</lombok.version>
|
<lombok.version>1.16.12</lombok.version>
|
||||||
|
<disruptor.version>3.3.6</disruptor.version>
|
||||||
|
|
||||||
<!-- testing -->
|
<!-- testing -->
|
||||||
<org.hamcrest.version>1.3</org.hamcrest.version>
|
<org.hamcrest.version>1.3</org.hamcrest.version>
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
package com.baeldung.disruptor;
|
||||||
|
|
||||||
|
import com.lmax.disruptor.RingBuffer;
|
||||||
|
|
||||||
|
public class DelayedMultiEventProducer implements EventProducer {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void startProducing(final RingBuffer<ValueEvent> ringBuffer, final int count) {
|
||||||
|
final Runnable simpleProducer = () -> produce(ringBuffer, count, false);
|
||||||
|
final Runnable delayedProducer = () -> produce(ringBuffer, count, true);
|
||||||
|
new Thread(simpleProducer).start();
|
||||||
|
new Thread(delayedProducer).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void produce(final RingBuffer<ValueEvent> ringBuffer, final int count, final boolean addDelay) {
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
final long seq = ringBuffer.next();
|
||||||
|
final ValueEvent valueEvent = ringBuffer.get(seq);
|
||||||
|
valueEvent.setValue(i);
|
||||||
|
ringBuffer.publish(seq);
|
||||||
|
if (addDelay) {
|
||||||
|
addDelay();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addDelay() {
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
} catch (InterruptedException interruptedException) {
|
||||||
|
// No-Op lets swallow it
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,13 @@
|
||||||
|
package com.baeldung.disruptor;
|
||||||
|
|
||||||
|
import com.lmax.disruptor.EventHandler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Consumer that consumes event from ring buffer.
|
||||||
|
*/
|
||||||
|
public interface EventConsumer {
|
||||||
|
/**
|
||||||
|
* One or more event handler to handle event from ring buffer.
|
||||||
|
*/
|
||||||
|
public EventHandler<ValueEvent>[] getEventHandler();
|
||||||
|
}
|
|
@ -0,0 +1,15 @@
|
||||||
|
package com.baeldung.disruptor;
|
||||||
|
|
||||||
|
import com.lmax.disruptor.RingBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Producer that produces event for ring buffer.
|
||||||
|
*/
|
||||||
|
public interface EventProducer {
|
||||||
|
/**
|
||||||
|
* Start the producer that would start producing the values.
|
||||||
|
* @param ringBuffer
|
||||||
|
* @param count
|
||||||
|
*/
|
||||||
|
public void startProducing(final RingBuffer<ValueEvent> ringBuffer, final int count);
|
||||||
|
}
|
|
@ -0,0 +1,23 @@
|
||||||
|
package com.baeldung.disruptor;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.lmax.disruptor.EventHandler;
|
||||||
|
|
||||||
|
public class MultiEventPrintConsumer implements EventConsumer {
|
||||||
|
|
||||||
|
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public EventHandler<ValueEvent>[] getEventHandler() {
|
||||||
|
final EventHandler<ValueEvent> eventHandler = (event, sequence, endOfBatch) -> print(event.getValue(), sequence);
|
||||||
|
final EventHandler<ValueEvent> otherEventHandler = (event, sequence, endOfBatch) -> print(event.getValue(), sequence);
|
||||||
|
return new EventHandler[] { eventHandler, otherEventHandler };
|
||||||
|
}
|
||||||
|
|
||||||
|
private void print(final int id, final long sequenceId) {
|
||||||
|
logger.info("Id is " + id + " sequence id that was used is " + sequenceId);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
package com.baeldung.disruptor;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.lmax.disruptor.EventHandler;
|
||||||
|
|
||||||
|
public class SingleEventPrintConsumer implements EventConsumer {
|
||||||
|
|
||||||
|
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public EventHandler<ValueEvent>[] getEventHandler() {
|
||||||
|
final EventHandler<ValueEvent> eventHandler = (event, sequence, endOfBatch) -> print(event.getValue(), sequence);
|
||||||
|
return new EventHandler[] { eventHandler };
|
||||||
|
}
|
||||||
|
|
||||||
|
private void print(final int id, final long sequenceId) {
|
||||||
|
logger.info("Id is " + id + " sequence id that was used is " + sequenceId);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
package com.baeldung.disruptor;
|
||||||
|
|
||||||
|
import com.lmax.disruptor.RingBuffer;
|
||||||
|
|
||||||
|
public class SingleEventProducer implements EventProducer {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void startProducing(RingBuffer<ValueEvent> ringBuffer, int count) {
|
||||||
|
final Runnable producer = () -> produce(ringBuffer, count);
|
||||||
|
new Thread(producer).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void produce(final RingBuffer<ValueEvent> ringBuffer, final int count) {
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
final long seq = ringBuffer.next();
|
||||||
|
final ValueEvent valueEvent = ringBuffer.get(seq);
|
||||||
|
valueEvent.setValue(i);
|
||||||
|
ringBuffer.publish(seq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
package com.baeldung.disruptor;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||||
|
|
||||||
|
import com.lmax.disruptor.EventFactory;
|
||||||
|
|
||||||
|
public final class ValueEvent {
|
||||||
|
|
||||||
|
private int value;
|
||||||
|
|
||||||
|
public int getValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setValue(int value) {
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public final static EventFactory<ValueEvent> EVENT_FACTORY = () -> new ValueEvent();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return ToStringBuilder.reflectionToString(this);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,83 @@
|
||||||
|
package com.baeldung.disruptor;
|
||||||
|
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import com.lmax.disruptor.BusySpinWaitStrategy;
|
||||||
|
import com.lmax.disruptor.RingBuffer;
|
||||||
|
import com.lmax.disruptor.WaitStrategy;
|
||||||
|
import com.lmax.disruptor.dsl.Disruptor;
|
||||||
|
import com.lmax.disruptor.dsl.ProducerType;
|
||||||
|
import com.lmax.disruptor.util.DaemonThreadFactory;
|
||||||
|
|
||||||
|
public class DisruptorTest {
|
||||||
|
private Disruptor<ValueEvent> disruptor;
|
||||||
|
private WaitStrategy waitStrategy;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
waitStrategy = new BusySpinWaitStrategy();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createDisruptor(final ProducerType producerType, final EventConsumer eventConsumer) {
|
||||||
|
final ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;
|
||||||
|
disruptor = new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, 16, threadFactory, producerType, waitStrategy);
|
||||||
|
disruptor.handleEventsWith(eventConsumer.getEventHandler());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startProducing(final RingBuffer<ValueEvent> ringBuffer, final int count, final EventProducer eventProducer) {
|
||||||
|
eventProducer.startProducing(ringBuffer, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenMultipleProducerSingleConsumer_thenOutputInFifoOrder() {
|
||||||
|
final EventConsumer eventConsumer = new SingleEventPrintConsumer();
|
||||||
|
final EventProducer eventProducer = new DelayedMultiEventProducer();
|
||||||
|
createDisruptor(ProducerType.MULTI, eventConsumer);
|
||||||
|
final RingBuffer<ValueEvent> ringBuffer = disruptor.start();
|
||||||
|
|
||||||
|
startProducing(ringBuffer, 32, eventProducer);
|
||||||
|
|
||||||
|
disruptor.halt();
|
||||||
|
disruptor.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenSingleProducerSingleConsumer_thenOutputInFifoOrder() {
|
||||||
|
final EventConsumer eventConsumer = new SingleEventConsumer();
|
||||||
|
final EventProducer eventProducer = new SingleEventProducer();
|
||||||
|
createDisruptor(ProducerType.SINGLE, eventConsumer);
|
||||||
|
final RingBuffer<ValueEvent> ringBuffer = disruptor.start();
|
||||||
|
|
||||||
|
startProducing(ringBuffer, 32, eventProducer);
|
||||||
|
|
||||||
|
disruptor.halt();
|
||||||
|
disruptor.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenSingleProducerMultipleConsumer_thenOutputInFifoOrder() {
|
||||||
|
final EventConsumer eventConsumer = new MultiEventConsumer();
|
||||||
|
final EventProducer eventProducer = new SingleEventProducer();
|
||||||
|
createDisruptor(ProducerType.SINGLE, eventConsumer);
|
||||||
|
final RingBuffer<ValueEvent> ringBuffer = disruptor.start();
|
||||||
|
|
||||||
|
startProducing(ringBuffer, 32, eventProducer);
|
||||||
|
|
||||||
|
disruptor.halt();
|
||||||
|
disruptor.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenMultipleProducerMultipleConsumer_thenOutputInFifoOrder() {
|
||||||
|
final EventConsumer eventConsumer = new MultiEventPrintConsumer();
|
||||||
|
final EventProducer eventProducer = new DelayedMultiEventProducer();
|
||||||
|
createDisruptor(ProducerType.MULTI, eventConsumer);
|
||||||
|
final RingBuffer<ValueEvent> ringBuffer = disruptor.start();
|
||||||
|
|
||||||
|
startProducing(ringBuffer, 32, eventProducer);
|
||||||
|
|
||||||
|
disruptor.halt();
|
||||||
|
disruptor.shutdown();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,27 @@
|
||||||
|
package com.baeldung.disruptor;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import com.lmax.disruptor.EventHandler;
|
||||||
|
|
||||||
|
public class MultiEventConsumer implements EventConsumer {
|
||||||
|
|
||||||
|
private int expectedValue = -1;
|
||||||
|
private int otherExpectedValue = -1;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public EventHandler<ValueEvent>[] getEventHandler() {
|
||||||
|
final EventHandler<ValueEvent> eventHandler = (event, sequence, endOfBatch) -> assertExpectedValue(event.getValue());
|
||||||
|
final EventHandler<ValueEvent> otherEventHandler = (event, sequence, endOfBatch) -> assertOtherExpectedValue(event.getValue());
|
||||||
|
return new EventHandler[] { eventHandler, otherEventHandler };
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertExpectedValue(final int id) {
|
||||||
|
assertEquals(++expectedValue, id);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertOtherExpectedValue(final int id) {
|
||||||
|
assertEquals(++otherExpectedValue, id);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,21 @@
|
||||||
|
package com.baeldung.disruptor;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import com.lmax.disruptor.EventHandler;
|
||||||
|
|
||||||
|
public class SingleEventConsumer implements EventConsumer {
|
||||||
|
|
||||||
|
private int expectedValue = -1;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public EventHandler<ValueEvent>[] getEventHandler() {
|
||||||
|
final EventHandler<ValueEvent> eventHandler = (event, sequence, endOfBatch) -> assertExpectedValue(event.getValue());
|
||||||
|
return new EventHandler[] { eventHandler };
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertExpectedValue(final int id) {
|
||||||
|
assertEquals(++expectedValue, id);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue