BAEL-3446 (#12872)
* BAEL-3446 a simple market data streaming app for the article. * BAEL-3446 add unit tests for the article code. * BAEL-3446 increase version, activate module in the parent pom, update readme. * BAEL-3446 improve the code semantically. * BAEL-3446 remove unnecessary files * BAEL-3446 fix continuation indentation as 2 * BAEL-3446 put new module in the correct profile Co-authored-by: Yavuz Tas <ytas@vwd.com>
This commit is contained in:
parent
2879088ee7
commit
125d9893ab
|
@ -0,0 +1,11 @@
|
|||
## Libraries-7
|
||||
|
||||
This module contains articles about various Java libraries.
|
||||
These are small libraries that are relatively easy to use and do not require any separate module of their own.
|
||||
|
||||
The code examples related to different libraries are each in their own module.
|
||||
|
||||
Remember, for advanced libraries like [Jackson](/jackson) and [JUnit](/testing-modules) we already have separate modules. Please make sure to have a look at the existing modules in such cases.
|
||||
|
||||
### Relevant articles
|
||||
- More articles [[<-- prev]](/libraries-6)
|
|
@ -0,0 +1,81 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>libraries-7</artifactId>
|
||||
|
||||
<parent>
|
||||
<artifactId>parent-modules</artifactId>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.agrona</groupId>
|
||||
<artifactId>agrona</artifactId>
|
||||
<version>1.17.1</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>exec-maven-plugin</artifactId>
|
||||
<version>1.6.0</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>generate-sources</phase>
|
||||
<goals>
|
||||
<goal>java</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<includeProjectDependencies>false</includeProjectDependencies>
|
||||
<includePluginDependencies>true</includePluginDependencies>
|
||||
<mainClass>uk.co.real_logic.sbe.SbeTool</mainClass>
|
||||
<systemProperties>
|
||||
<systemProperty>
|
||||
<key>sbe.output.dir</key>
|
||||
<value>${project.build.directory}/generated-sources/java</value>
|
||||
</systemProperty>
|
||||
</systemProperties>
|
||||
<arguments>
|
||||
<argument>${project.basedir}/src/main/resources/schema.xml</argument>
|
||||
</arguments>
|
||||
<workingDirectory>${project.build.directory}/generated-sources/java</workingDirectory>
|
||||
</configuration>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>uk.co.real-logic</groupId>
|
||||
<artifactId>sbe-tool</artifactId>
|
||||
<version>1.27.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>build-helper-maven-plugin</artifactId>
|
||||
<version>3.0.0</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>add-source</id>
|
||||
<phase>generate-sources</phase>
|
||||
<goals>
|
||||
<goal>add-source</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<sources>
|
||||
<source>${project.build.directory}/generated-sources/java/</source>
|
||||
</sources>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,98 @@
|
|||
package com.baeldung.sbe;
|
||||
|
||||
import java.util.StringJoiner;
|
||||
|
||||
import com.baeldung.sbe.stub.Currency;
|
||||
import com.baeldung.sbe.stub.Market;
|
||||
|
||||
public class MarketData {
|
||||
|
||||
private final int amount;
|
||||
private final double price;
|
||||
private final Market market;
|
||||
private final Currency currency;
|
||||
private final String symbol;
|
||||
|
||||
public MarketData(int amount, double price, Market market, Currency currency, String symbol) {
|
||||
this.amount = amount;
|
||||
this.price = price;
|
||||
this.market = market;
|
||||
this.currency = currency;
|
||||
this.symbol = symbol;
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private int amount;
|
||||
|
||||
public Builder amount(int amount) {
|
||||
this.amount = amount;
|
||||
return this;
|
||||
}
|
||||
|
||||
private double price;
|
||||
|
||||
public Builder price(double price) {
|
||||
this.price = price;
|
||||
return this;
|
||||
}
|
||||
|
||||
private Market market;
|
||||
|
||||
public Builder market(Market market) {
|
||||
this.market = market;
|
||||
return this;
|
||||
}
|
||||
|
||||
private Currency currency;
|
||||
|
||||
public Builder currency(Currency currency) {
|
||||
this.currency = currency;
|
||||
return this;
|
||||
}
|
||||
|
||||
private String symbol;
|
||||
|
||||
public Builder symbol(String symbol) {
|
||||
this.symbol = symbol;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MarketData build() {
|
||||
return new MarketData(amount, price, market, currency, symbol);
|
||||
}
|
||||
}
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
public int getAmount() {
|
||||
return amount;
|
||||
}
|
||||
|
||||
public double getPrice() {
|
||||
return price;
|
||||
}
|
||||
|
||||
public Market getMarket() {
|
||||
return market;
|
||||
}
|
||||
|
||||
public Currency getCurrency() {
|
||||
return currency;
|
||||
}
|
||||
|
||||
public String getSymbol() {
|
||||
return symbol;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new StringJoiner(", ", MarketData.class.getSimpleName() + "[", "]").add("amount=" + amount)
|
||||
.add("price=" + price)
|
||||
.add("market=" + market)
|
||||
.add("currency=" + currency)
|
||||
.add("symbol='" + symbol + "'")
|
||||
.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
package com.baeldung.sbe;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
|
||||
import com.baeldung.sbe.stub.Currency;
|
||||
import com.baeldung.sbe.stub.Market;
|
||||
|
||||
public class MarketDataSource implements Iterator<MarketData> {
|
||||
|
||||
private final LinkedList<MarketData> dataQueue = new LinkedList<>();
|
||||
|
||||
public MarketDataSource() {
|
||||
// adding some test data into queue
|
||||
this.dataQueue.addAll(Arrays.asList(MarketData.builder()
|
||||
.amount(1)
|
||||
.market(Market.NASDAQ)
|
||||
.symbol("AAPL")
|
||||
.price(134.12)
|
||||
.currency(Currency.USD)
|
||||
.build(), MarketData.builder()
|
||||
.amount(2)
|
||||
.market(Market.NYSE)
|
||||
.symbol("IBM")
|
||||
.price(128.99)
|
||||
.currency(Currency.USD)
|
||||
.build(), MarketData.builder()
|
||||
.amount(1)
|
||||
.market(Market.NASDAQ)
|
||||
.symbol("AXP")
|
||||
.price(34.87)
|
||||
.currency(Currency.EUR)
|
||||
.build()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return !this.dataQueue.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MarketData next() {
|
||||
final MarketData data = this.dataQueue.pop();
|
||||
this.dataQueue.add(data);
|
||||
return data;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,92 @@
|
|||
package com.baeldung.sbe;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class MarketDataStreamServer {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MarketDataStreamServer.class);
|
||||
|
||||
ByteBuffer buffer = ByteBuffer.allocate(128);
|
||||
|
||||
final AtomicLong writePos = new AtomicLong();
|
||||
|
||||
ScheduledExecutorService writerThread = Executors.newScheduledThreadPool(1);
|
||||
ScheduledExecutorService readerThreadPool = Executors.newScheduledThreadPool(2);
|
||||
|
||||
private class Client {
|
||||
|
||||
final String name;
|
||||
final ByteBuffer readOnlyBuffer;
|
||||
|
||||
final AtomicLong readPos = new AtomicLong();
|
||||
|
||||
Client(String name, ByteBuffer source) {
|
||||
this.name = name;
|
||||
this.readOnlyBuffer = source.asReadOnlyBuffer();
|
||||
}
|
||||
|
||||
void readTradeData() {
|
||||
while (readPos.get() < writePos.get()) {
|
||||
try {
|
||||
final int pos = this.readOnlyBuffer.position();
|
||||
final MarketData data = MarketDataUtil.readAndDecode(this.readOnlyBuffer);
|
||||
readPos.addAndGet(this.readOnlyBuffer.position() - pos);
|
||||
log.info("<readTradeData> client: {}, read/write gap: {}, data: {}", name, writePos.get() - readPos.get(), data);
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
this.readOnlyBuffer.clear(); // ring buffer
|
||||
} catch (Exception e) {
|
||||
log.error("<readTradeData> cannot read from buffer {}", readOnlyBuffer);
|
||||
}
|
||||
}
|
||||
if (this.readOnlyBuffer.remaining() == 0) {
|
||||
this.readOnlyBuffer.clear(); // ring buffer
|
||||
}
|
||||
}
|
||||
|
||||
void read() {
|
||||
readerThreadPool.scheduleAtFixedRate(this::readTradeData, 1, 1, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
private Client newClient(String name) {
|
||||
return new Client(name, buffer);
|
||||
}
|
||||
|
||||
private void writeTradeData(MarketData data) {
|
||||
try {
|
||||
final int writtenBytes = MarketDataUtil.encodeAndWrite(buffer, data);
|
||||
writePos.addAndGet(writtenBytes);
|
||||
log.info("<writeTradeData> buffer size remaining: %{}, data: {}", 100 * buffer.remaining() / buffer.capacity(), data);
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
buffer.clear(); // ring buffer
|
||||
writeTradeData(data);
|
||||
} catch (Exception e) {
|
||||
log.error("<writeTradeData> cannot write into buffer {}", buffer);
|
||||
}
|
||||
}
|
||||
|
||||
private void run(MarketDataSource source) {
|
||||
writerThread.scheduleAtFixedRate(() -> {
|
||||
if (source.hasNext()) {
|
||||
writeTradeData(source.next());
|
||||
}
|
||||
}, 1, 2, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
MarketDataStreamServer server = new MarketDataStreamServer();
|
||||
Client client1 = server.newClient("client1");
|
||||
client1.read();
|
||||
Client client2 = server.newClient("client2");
|
||||
client2.read();
|
||||
server.run(new MarketDataSource());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
package com.baeldung.sbe;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.agrona.concurrent.UnsafeBuffer;
|
||||
|
||||
import com.baeldung.sbe.stub.MessageHeaderDecoder;
|
||||
import com.baeldung.sbe.stub.MessageHeaderEncoder;
|
||||
import com.baeldung.sbe.stub.TradeDataDecoder;
|
||||
import com.baeldung.sbe.stub.TradeDataEncoder;
|
||||
|
||||
public class MarketDataUtil {
|
||||
|
||||
public static int encodeAndWrite(ByteBuffer buffer, MarketData marketData) {
|
||||
|
||||
final int pos = buffer.position();
|
||||
|
||||
final UnsafeBuffer directBuffer = new UnsafeBuffer(buffer);
|
||||
final MessageHeaderEncoder headerEncoder = new MessageHeaderEncoder();
|
||||
final TradeDataEncoder dataEncoder = new TradeDataEncoder();
|
||||
|
||||
final BigDecimal priceDecimal = BigDecimal.valueOf(marketData.getPrice());
|
||||
final int priceMantis = priceDecimal.scaleByPowerOfTen(priceDecimal.scale())
|
||||
.intValue();
|
||||
final int priceExponent = priceDecimal.scale() * -1;
|
||||
|
||||
final TradeDataEncoder encoder = dataEncoder.wrapAndApplyHeader(directBuffer, pos, headerEncoder);
|
||||
encoder.amount(marketData.getAmount());
|
||||
encoder.quote()
|
||||
.market(marketData.getMarket())
|
||||
.currency(marketData.getCurrency())
|
||||
.symbol(marketData.getSymbol())
|
||||
.price()
|
||||
.mantissa(priceMantis)
|
||||
.exponent((byte) priceExponent);
|
||||
|
||||
// set position
|
||||
final int encodedLength = headerEncoder.encodedLength() + encoder.encodedLength();
|
||||
buffer.position(pos + encodedLength);
|
||||
return encodedLength;
|
||||
}
|
||||
|
||||
public static MarketData readAndDecode(ByteBuffer buffer) {
|
||||
|
||||
final int pos = buffer.position();
|
||||
|
||||
final UnsafeBuffer directBuffer = new UnsafeBuffer(buffer);
|
||||
final MessageHeaderDecoder headerDecoder = new MessageHeaderDecoder();
|
||||
final TradeDataDecoder dataDecoder = new TradeDataDecoder();
|
||||
|
||||
dataDecoder.wrapAndApplyHeader(directBuffer, pos, headerDecoder);
|
||||
|
||||
// set position
|
||||
final int encodedLength = headerDecoder.encodedLength() + dataDecoder.encodedLength();
|
||||
buffer.position(pos + encodedLength);
|
||||
|
||||
final double price = BigDecimal.valueOf(dataDecoder.quote()
|
||||
.price()
|
||||
.mantissa())
|
||||
.scaleByPowerOfTen(dataDecoder.quote()
|
||||
.price()
|
||||
.exponent())
|
||||
.doubleValue();
|
||||
|
||||
return MarketData.builder()
|
||||
.amount(dataDecoder.amount())
|
||||
.symbol(dataDecoder.quote()
|
||||
.symbol())
|
||||
.market(dataDecoder.quote()
|
||||
.market())
|
||||
.currency(dataDecoder.quote()
|
||||
.currency())
|
||||
.price(price)
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<sbe:messageSchema xmlns:sbe="http://fixprotocol.io/2016/sbe"
|
||||
package="com.baeldung.sbe.stub" id="1" version="0" semanticVersion="5.2" description="A schema represents stock market data.">
|
||||
<types>
|
||||
<composite name="messageHeader" description="Message identifiers and length of message root.">
|
||||
<type name="blockLength" primitiveType="uint16"/>
|
||||
<type name="templateId" primitiveType="uint16"/>
|
||||
<type name="schemaId" primitiveType="uint16"/>
|
||||
<type name="version" primitiveType="uint16"/>
|
||||
</composite>
|
||||
<enum name="Market" encodingType="uint8">
|
||||
<validValue name="NYSE" description="New York Stock Exchange">0</validValue>
|
||||
<validValue name="NASDAQ" description="National Association of Securities Dealers Automated Quotations">1</validValue>
|
||||
</enum>
|
||||
<type name="Symbol" primitiveType="char" length="4" characterEncoding="ASCII" description="Stock symbol"/>
|
||||
<composite name="Decimal">
|
||||
<type name="mantissa" primitiveType="uint64" minValue="0"/>
|
||||
<type name="exponent" primitiveType="int8"/>
|
||||
</composite>
|
||||
<enum name="Currency" encodingType="uint8">
|
||||
<validValue name="USD" description="US Dollar">0</validValue>
|
||||
<validValue name="EUR" description="Euro">1</validValue>
|
||||
</enum>
|
||||
<composite name="Quote" description="A quote represents the price of a stock in a market">
|
||||
<ref name="market" type="Market"/>
|
||||
<ref name="symbol" type="Symbol"/>
|
||||
<ref name="price" type="Decimal"/>
|
||||
<ref name="currency" type="Currency"/>
|
||||
</composite>
|
||||
</types>
|
||||
<sbe:message name="TradeData" id="1" description="Represents a quote and amount of trade">
|
||||
<field name="quote" id="1" type="Quote"/>
|
||||
<field name="amount" id="2" type="uint16"/>
|
||||
</sbe:message>
|
||||
</sbe:messageSchema>
|
|
@ -0,0 +1,75 @@
|
|||
package com.baeldung.test;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.agrona.concurrent.UnsafeBuffer;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import com.baeldung.sbe.MarketData;
|
||||
import com.baeldung.sbe.stub.Currency;
|
||||
import com.baeldung.sbe.stub.Market;
|
||||
import com.baeldung.sbe.stub.MessageHeaderDecoder;
|
||||
import com.baeldung.sbe.stub.MessageHeaderEncoder;
|
||||
import com.baeldung.sbe.stub.TradeDataDecoder;
|
||||
import com.baeldung.sbe.stub.TradeDataEncoder;
|
||||
|
||||
public class EncodeDecodeMarketDataUnitTest {
|
||||
|
||||
private MarketData marketData;
|
||||
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
marketData = new MarketData(2, 128.99, Market.NYSE, Currency.USD, "IBM");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenMarketData_whenEncode_thenDecodedValuesMatch() {
|
||||
// our buffer to write encoded data, initial cap. 128 bytes
|
||||
UnsafeBuffer buffer = new UnsafeBuffer(ByteBuffer.allocate(128));
|
||||
// necessary encoders
|
||||
MessageHeaderEncoder headerEncoder = new MessageHeaderEncoder();
|
||||
TradeDataEncoder dataEncoder = new TradeDataEncoder();
|
||||
// we parse price data (double) into two parts: mantis and exponent
|
||||
BigDecimal priceDecimal = BigDecimal.valueOf(marketData.getPrice());
|
||||
int priceMantissa = priceDecimal.scaleByPowerOfTen(priceDecimal.scale())
|
||||
.intValue();
|
||||
int priceExponent = priceDecimal.scale() * -1;
|
||||
// encode data
|
||||
TradeDataEncoder encoder = dataEncoder.wrapAndApplyHeader(buffer, 0, headerEncoder);
|
||||
encoder.amount(marketData.getAmount());
|
||||
encoder.quote()
|
||||
.market(marketData.getMarket())
|
||||
.currency(marketData.getCurrency())
|
||||
.symbol(marketData.getSymbol())
|
||||
.price()
|
||||
.mantissa(priceMantissa)
|
||||
.exponent((byte) priceExponent);
|
||||
|
||||
// necessary decoders
|
||||
MessageHeaderDecoder headerDecoder = new MessageHeaderDecoder();
|
||||
TradeDataDecoder dataDecoder = new TradeDataDecoder();
|
||||
// decode data
|
||||
dataDecoder.wrapAndApplyHeader(buffer, 0, headerDecoder);
|
||||
// decode price data (from mantissa and exponent) into a double
|
||||
double price = BigDecimal.valueOf(dataDecoder.quote()
|
||||
.price()
|
||||
.mantissa())
|
||||
.scaleByPowerOfTen(dataDecoder.quote()
|
||||
.price()
|
||||
.exponent())
|
||||
.doubleValue();
|
||||
// ensure we have the exact same values
|
||||
Assertions.assertEquals(2, dataDecoder.amount());
|
||||
Assertions.assertEquals("IBM", dataDecoder.quote()
|
||||
.symbol());
|
||||
Assertions.assertEquals(Market.NYSE, dataDecoder.quote()
|
||||
.market());
|
||||
Assertions.assertEquals(Currency.USD, dataDecoder.quote()
|
||||
.currency());
|
||||
Assertions.assertEquals(128.99, price);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue