BAEL-5855 Persisting the Query Model (#12976)
* BAEL-5855 Persisting the Query Model * BAEL-5855 Persisting the Query Model, apply baeldung code style.
This commit is contained in:
parent
30eccf2ae8
commit
d54d4768c0
|
@ -2,11 +2,17 @@
|
|||
|
||||
This module contains articles about Axon
|
||||
|
||||
## Profiles
|
||||
|
||||
Optionally the code can be run with the 'mongo' profile to use Mongo to store the projection. Otherwise, an in-memory
|
||||
projection is used.
|
||||
|
||||
## Scripts
|
||||
|
||||
One script is included to easily start middleware using Docker:
|
||||
Two scripts are included to easily start middleware using Docker matching the properties files:
|
||||
|
||||
- `start_axon_server.sh` to start an Axon Server instance
|
||||
- `start_mongo.sh` to start a MongoDB instance
|
||||
|
||||
### Relevant articles
|
||||
|
||||
|
@ -14,3 +20,4 @@ One script is included to easily start middleware using Docker:
|
|||
- [Multi-Entity Aggregates in Axon](https://www.baeldung.com/java-axon-multi-entity-aggregates)
|
||||
- [Snapshotting Aggregates in Axon](https://www.baeldung.com/axon-snapshotting-aggregates)
|
||||
- [Dispatching Queries in Axon Framework](https://www.baeldung.com/axon-query-dispatching)
|
||||
- [Persisting the Query Model](https://www.baeldung.com/persisting-the-query-model)
|
||||
|
|
19
axon/pom.xml
19
axon/pom.xml
|
@ -44,6 +44,10 @@
|
|||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-data-jpa</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.axonframework.extensions.mongo</groupId>
|
||||
<artifactId>axon-mongo</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.projectreactor</groupId>
|
||||
<artifactId>reactor-core</artifactId>
|
||||
|
@ -68,10 +72,23 @@
|
|||
<artifactId>reactor-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>de.flapdoodle.embed</groupId>
|
||||
<artifactId>de.flapdoodle.embed.mongo</artifactId>
|
||||
<version>${de.flapdoodle.embed.mongo.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.awaitility</groupId>
|
||||
<artifactId>awaitility</artifactId>
|
||||
<version>4.2.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
<axon-bom.version>4.6.0</axon-bom.version>
|
||||
<axon-bom.version>4.6.2</axon-bom.version>
|
||||
<de.flapdoodle.embed.mongo.version>3.4.8</de.flapdoodle.embed.mongo.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
|
@ -9,5 +9,4 @@ public class OrderApplication {
|
|||
public static void main(String[] args) {
|
||||
SpringApplication.run(OrderApplication.class, args);
|
||||
}
|
||||
|
||||
}
|
|
@ -11,8 +11,7 @@ import org.springframework.context.annotation.Configuration;
|
|||
public class OrderApplicationConfiguration {
|
||||
|
||||
@Bean
|
||||
public SnapshotTriggerDefinition orderAggregateSnapshotTriggerDefinition(Snapshotter snapshotter,
|
||||
@Value("${axon.aggregate.order.snapshot-threshold:250}") int threshold) {
|
||||
public SnapshotTriggerDefinition orderAggregateSnapshotTriggerDefinition(Snapshotter snapshotter, @Value("${axon.aggregate.order.snapshot-threshold:250}") int threshold) {
|
||||
return new EventCountSnapshotTriggerDefinition(snapshotter, threshold);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import com.baeldung.axon.coreapi.events.ProductRemovedEvent;
|
|||
import com.baeldung.axon.coreapi.exceptions.DuplicateOrderLineException;
|
||||
import com.baeldung.axon.coreapi.exceptions.OrderAlreadyConfirmedException;
|
||||
import com.baeldung.axon.coreapi.exceptions.UnconfirmedOrderException;
|
||||
|
||||
import org.axonframework.commandhandling.CommandHandler;
|
||||
import org.axonframework.eventsourcing.EventSourcingHandler;
|
||||
import org.axonframework.modelling.command.AggregateIdentifier;
|
||||
|
|
|
@ -7,6 +7,7 @@ import com.baeldung.axon.coreapi.events.ProductCountDecrementedEvent;
|
|||
import com.baeldung.axon.coreapi.events.ProductCountIncrementedEvent;
|
||||
import com.baeldung.axon.coreapi.events.ProductRemovedEvent;
|
||||
import com.baeldung.axon.coreapi.exceptions.OrderAlreadyConfirmedException;
|
||||
|
||||
import org.axonframework.commandhandling.CommandHandler;
|
||||
import org.axonframework.eventsourcing.EventSourcingHandler;
|
||||
import org.axonframework.modelling.command.EntityId;
|
||||
|
|
|
@ -42,9 +42,6 @@ public class AddProductCommand {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "AddProductCommand{" +
|
||||
"orderId='" + orderId + '\'' +
|
||||
", productId='" + productId + '\'' +
|
||||
'}';
|
||||
return "AddProductCommand{" + "orderId='" + orderId + '\'' + ", productId='" + productId + '\'' + '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,8 +36,6 @@ public class ConfirmOrderCommand {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ConfirmOrderCommand{" +
|
||||
"orderId='" + orderId + '\'' +
|
||||
'}';
|
||||
return "ConfirmOrderCommand{" + "orderId='" + orderId + '\'' + '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,8 +36,6 @@ public class CreateOrderCommand {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "CreateOrderCommand{" +
|
||||
"orderId='" + orderId + '\'' +
|
||||
'}';
|
||||
return "CreateOrderCommand{" + "orderId='" + orderId + '\'' + '}';
|
||||
}
|
||||
}
|
|
@ -42,9 +42,6 @@ public class DecrementProductCountCommand {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DecrementProductCountCommand{" +
|
||||
"orderId='" + orderId + '\'' +
|
||||
", productId='" + productId + '\'' +
|
||||
'}';
|
||||
return "DecrementProductCountCommand{" + "orderId='" + orderId + '\'' + ", productId='" + productId + '\'' + '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,9 +42,6 @@ public class IncrementProductCountCommand {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "IncrementProductCountCommand{" +
|
||||
"orderId='" + orderId + '\'' +
|
||||
", productId='" + productId + '\'' +
|
||||
'}';
|
||||
return "IncrementProductCountCommand{" + "orderId='" + orderId + '\'' + ", productId='" + productId + '\'' + '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,8 +36,6 @@ public class ShipOrderCommand {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ShipOrderCommand{" +
|
||||
"orderId='" + orderId + '\'' +
|
||||
'}';
|
||||
return "ShipOrderCommand{" + "orderId='" + orderId + '\'' + '}';
|
||||
}
|
||||
}
|
|
@ -33,8 +33,6 @@ public class OrderConfirmedEvent {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "OrderConfirmedEvent{" +
|
||||
"orderId='" + orderId + '\'' +
|
||||
'}';
|
||||
return "OrderConfirmedEvent{" + "orderId='" + orderId + '\'' + '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,8 +33,6 @@ public class OrderCreatedEvent {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "OrderCreatedEvent{" +
|
||||
"orderId='" + orderId + '\'' +
|
||||
'}';
|
||||
return "OrderCreatedEvent{" + "orderId='" + orderId + '\'' + '}';
|
||||
}
|
||||
}
|
|
@ -33,8 +33,6 @@ public class OrderShippedEvent {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "OrderShippedEvent{" +
|
||||
"orderId='" + orderId + '\'' +
|
||||
'}';
|
||||
return "OrderShippedEvent{" + "orderId='" + orderId + '\'' + '}';
|
||||
}
|
||||
}
|
|
@ -39,9 +39,6 @@ public class ProductAddedEvent {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ProductAddedEvent{" +
|
||||
"orderId='" + orderId + '\'' +
|
||||
", productId='" + productId + '\'' +
|
||||
'}';
|
||||
return "ProductAddedEvent{" + "orderId='" + orderId + '\'' + ", productId='" + productId + '\'' + '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,9 +39,6 @@ public class ProductCountDecrementedEvent {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ProductCountDecrementedEvent{" +
|
||||
"orderId='" + orderId + '\'' +
|
||||
", productId='" + productId + '\'' +
|
||||
'}';
|
||||
return "ProductCountDecrementedEvent{" + "orderId='" + orderId + '\'' + ", productId='" + productId + '\'' + '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,9 +39,6 @@ public class ProductCountIncrementedEvent {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ProductCountIncrementedEvent{" +
|
||||
"orderId='" + orderId + '\'' +
|
||||
", productId='" + productId + '\'' +
|
||||
'}';
|
||||
return "ProductCountIncrementedEvent{" + "orderId='" + orderId + '\'' + ", productId='" + productId + '\'' + '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,9 +39,6 @@ public class ProductRemovedEvent {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ProductRemovedEvent{" +
|
||||
"orderId='" + orderId + '\'' +
|
||||
", productId='" + productId + '\'' +
|
||||
'}';
|
||||
return "ProductRemovedEvent{" + "orderId='" + orderId + '\'' + ", productId='" + productId + '\'' + '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,7 +40,6 @@ public class Order {
|
|||
products.computeIfPresent(productId, (id, count) -> --count);
|
||||
}
|
||||
|
||||
|
||||
public void removeProduct(String productId) {
|
||||
products.remove(productId);
|
||||
}
|
||||
|
@ -62,9 +61,7 @@ public class Order {
|
|||
return false;
|
||||
}
|
||||
Order that = (Order) o;
|
||||
return Objects.equals(orderId, that.orderId)
|
||||
&& Objects.equals(products, that.products)
|
||||
&& orderStatus == that.orderStatus;
|
||||
return Objects.equals(orderId, that.orderId) && Objects.equals(products, that.products) && orderStatus == that.orderStatus;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -74,10 +71,6 @@ public class Order {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Order{" +
|
||||
"orderId='" + orderId + '\'' +
|
||||
", products=" + products +
|
||||
", orderStatus=" + orderStatus +
|
||||
'}';
|
||||
return "Order{" + "orderId='" + orderId + '\'' + ", products=" + products + ", orderStatus=" + orderStatus + '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,8 +32,6 @@ public class OrderUpdatesQuery {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "OrderUpdatesQuery{" +
|
||||
"orderId='" + orderId + '\'' +
|
||||
'}';
|
||||
return "OrderUpdatesQuery{" + "orderId='" + orderId + '\'' + '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,8 +32,6 @@ public class TotalProductsShippedQuery {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TotalProductsShippedQuery{" +
|
||||
"productId='" + productId + '\'' +
|
||||
'}';
|
||||
return "TotalProductsShippedQuery{" + "productId='" + productId + '\'' + '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,12 +8,14 @@ import com.baeldung.axon.coreapi.commands.IncrementProductCountCommand;
|
|||
import com.baeldung.axon.coreapi.commands.ShipOrderCommand;
|
||||
import com.baeldung.axon.querymodel.OrderQueryService;
|
||||
import com.baeldung.axon.querymodel.OrderResponse;
|
||||
|
||||
import org.axonframework.commandhandling.gateway.CommandGateway;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -33,7 +35,8 @@ public class OrderRestEndpoint {
|
|||
|
||||
@PostMapping("/ship-order")
|
||||
public CompletableFuture<Void> shipOrder() {
|
||||
String orderId = UUID.randomUUID().toString();
|
||||
String orderId = UUID.randomUUID()
|
||||
.toString();
|
||||
return commandGateway.send(new CreateOrderCommand(orderId))
|
||||
.thenCompose(result -> commandGateway.send(new AddProductCommand(orderId, "Deluxe Chair")))
|
||||
.thenCompose(result -> commandGateway.send(new ConfirmOrderCommand(orderId)))
|
||||
|
@ -42,7 +45,8 @@ public class OrderRestEndpoint {
|
|||
|
||||
@PostMapping("/ship-unconfirmed-order")
|
||||
public CompletableFuture<Void> shipUnconfirmedOrder() {
|
||||
String orderId = UUID.randomUUID().toString();
|
||||
String orderId = UUID.randomUUID()
|
||||
.toString();
|
||||
return commandGateway.send(new CreateOrderCommand(orderId))
|
||||
.thenCompose(result -> commandGateway.send(new AddProductCommand(orderId, "Deluxe Chair")))
|
||||
// This throws an exception, as an Order cannot be shipped if it has not been confirmed yet.
|
||||
|
@ -51,7 +55,8 @@ public class OrderRestEndpoint {
|
|||
|
||||
@PostMapping("/order")
|
||||
public CompletableFuture<String> createOrder() {
|
||||
return createOrder(UUID.randomUUID().toString());
|
||||
return createOrder(UUID.randomUUID()
|
||||
.toString());
|
||||
}
|
||||
|
||||
@PostMapping("/order/{order-id}")
|
||||
|
@ -60,20 +65,17 @@ public class OrderRestEndpoint {
|
|||
}
|
||||
|
||||
@PostMapping("/order/{order-id}/product/{product-id}")
|
||||
public CompletableFuture<Void> addProduct(@PathVariable("order-id") String orderId,
|
||||
@PathVariable("product-id") String productId) {
|
||||
public CompletableFuture<Void> addProduct(@PathVariable("order-id") String orderId, @PathVariable("product-id") String productId) {
|
||||
return commandGateway.send(new AddProductCommand(orderId, productId));
|
||||
}
|
||||
|
||||
@PostMapping("/order/{order-id}/product/{product-id}/increment")
|
||||
public CompletableFuture<Void> incrementProduct(@PathVariable("order-id") String orderId,
|
||||
@PathVariable("product-id") String productId) {
|
||||
public CompletableFuture<Void> incrementProduct(@PathVariable("order-id") String orderId, @PathVariable("product-id") String productId) {
|
||||
return commandGateway.send(new IncrementProductCountCommand(orderId, productId));
|
||||
}
|
||||
|
||||
@PostMapping("/order/{order-id}/product/{product-id}/decrement")
|
||||
public CompletableFuture<Void> decrementProduct(@PathVariable("order-id") String orderId,
|
||||
@PathVariable("product-id") String productId) {
|
||||
public CompletableFuture<Void> decrementProduct(@PathVariable("order-id") String orderId, @PathVariable("product-id") String productId) {
|
||||
return commandGateway.send(new DecrementProductCountCommand(orderId, productId));
|
||||
}
|
||||
|
||||
|
|
|
@ -18,7 +18,9 @@ import org.axonframework.eventhandling.EventHandler;
|
|||
import org.axonframework.queryhandling.QueryHandler;
|
||||
import org.axonframework.queryhandling.QueryUpdateEmitter;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
|
@ -30,6 +32,7 @@ import java.util.Optional;
|
|||
|
||||
@Service
|
||||
@ProcessingGroup("orders")
|
||||
@Profile("!mongo")
|
||||
public class InMemoryOrdersEventHandler implements OrdersEventHandler {
|
||||
|
||||
private final Map<String, Order> orders = new HashMap<>();
|
||||
|
@ -106,7 +109,8 @@ public class InMemoryOrdersEventHandler implements OrdersEventHandler {
|
|||
|
||||
@QueryHandler
|
||||
public Publisher<Order> handleStreaming(FindAllOrderedProductsQuery query) {
|
||||
return Mono.fromCallable(orders::values).flatMapMany(Flux::fromIterable);
|
||||
return Mono.fromCallable(orders::values)
|
||||
.flatMapMany(Flux::fromIterable);
|
||||
}
|
||||
|
||||
@QueryHandler
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
package com.baeldung.axon.querymodel;
|
||||
|
||||
import com.mongodb.client.MongoClient;
|
||||
|
||||
import org.axonframework.eventhandling.tokenstore.TokenStore;
|
||||
import org.axonframework.extensions.mongo.DefaultMongoTemplate;
|
||||
import org.axonframework.extensions.mongo.eventsourcing.tokenstore.MongoTokenStore;
|
||||
import org.axonframework.serialization.Serializer;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
|
||||
@Configuration
|
||||
@Profile("mongo")
|
||||
public class MongoConfiguration {
|
||||
|
||||
@Bean
|
||||
public TokenStore getTokenStore(MongoClient client, Serializer serializer) {
|
||||
return MongoTokenStore.builder()
|
||||
.mongoTemplate(DefaultMongoTemplate.builder()
|
||||
.mongoDatabase(client)
|
||||
.build())
|
||||
.serializer(serializer)
|
||||
.build();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,194 @@
|
|||
package com.baeldung.axon.querymodel;
|
||||
|
||||
import com.baeldung.axon.coreapi.events.OrderConfirmedEvent;
|
||||
import com.baeldung.axon.coreapi.events.OrderCreatedEvent;
|
||||
import com.baeldung.axon.coreapi.events.OrderShippedEvent;
|
||||
import com.baeldung.axon.coreapi.events.ProductAddedEvent;
|
||||
import com.baeldung.axon.coreapi.events.ProductCountDecrementedEvent;
|
||||
import com.baeldung.axon.coreapi.events.ProductCountIncrementedEvent;
|
||||
import com.baeldung.axon.coreapi.events.ProductRemovedEvent;
|
||||
import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery;
|
||||
import com.baeldung.axon.coreapi.queries.Order;
|
||||
import com.baeldung.axon.coreapi.queries.OrderStatus;
|
||||
import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery;
|
||||
import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery;
|
||||
import com.mongodb.client.MongoClient;
|
||||
import com.mongodb.client.MongoCollection;
|
||||
import com.mongodb.client.model.IndexOptions;
|
||||
import com.mongodb.client.model.Indexes;
|
||||
import com.mongodb.client.result.UpdateResult;
|
||||
|
||||
import groovyjarjarantlr4.v4.runtime.misc.NotNull;
|
||||
|
||||
import org.axonframework.config.ProcessingGroup;
|
||||
import org.axonframework.eventhandling.EventHandler;
|
||||
import org.axonframework.queryhandling.QueryHandler;
|
||||
import org.axonframework.queryhandling.QueryUpdateEmitter;
|
||||
import org.bson.Document;
|
||||
import org.bson.conversions.Bson;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static com.mongodb.client.model.Filters.*;
|
||||
|
||||
@Service
|
||||
@ProcessingGroup("orders")
|
||||
@Profile("mongo")
|
||||
public class MongoOrdersEventHandler implements OrdersEventHandler {
|
||||
|
||||
static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup()
|
||||
.lookupClass());
|
||||
|
||||
private final MongoCollection<Document> orders;
|
||||
private final QueryUpdateEmitter emitter;
|
||||
private static final String ORDER_COLLECTION_NAME = "orders";
|
||||
private static final String AXON_FRAMEWORK_DATABASE_NAME = "axonframework";
|
||||
|
||||
private static final String ORDER_ID_PROPERTY_NAME = "orderId";
|
||||
private static final String PRODUCTS_PROPERTY_NAME = "products";
|
||||
private static final String ORDER_STATUS_PROPERTY_NAME = "orderStatus";
|
||||
|
||||
public MongoOrdersEventHandler(MongoClient client, QueryUpdateEmitter emitter) {
|
||||
orders = client.getDatabase(AXON_FRAMEWORK_DATABASE_NAME)
|
||||
.getCollection(ORDER_COLLECTION_NAME);
|
||||
orders.createIndex(Indexes.ascending(ORDER_ID_PROPERTY_NAME), new IndexOptions().unique(true));
|
||||
this.emitter = emitter;
|
||||
}
|
||||
|
||||
@EventHandler
|
||||
public void on(OrderCreatedEvent event) {
|
||||
orders.insertOne(orderToDocument(new Order(event.getOrderId())));
|
||||
}
|
||||
|
||||
@EventHandler
|
||||
public void on(ProductAddedEvent event) {
|
||||
update(event.getOrderId(), o -> o.addProduct(event.getProductId()));
|
||||
}
|
||||
|
||||
@EventHandler
|
||||
public void on(ProductCountIncrementedEvent event) {
|
||||
update(event.getOrderId(), o -> o.incrementProductInstance(event.getProductId()));
|
||||
}
|
||||
|
||||
@EventHandler
|
||||
public void on(ProductCountDecrementedEvent event) {
|
||||
update(event.getOrderId(), o -> o.decrementProductInstance(event.getProductId()));
|
||||
}
|
||||
|
||||
@EventHandler
|
||||
public void on(ProductRemovedEvent event) {
|
||||
update(event.getOrderId(), o -> o.removeProduct(event.getProductId()));
|
||||
}
|
||||
|
||||
@EventHandler
|
||||
public void on(OrderConfirmedEvent event) {
|
||||
update(event.getOrderId(), Order::setOrderConfirmed);
|
||||
}
|
||||
|
||||
@EventHandler
|
||||
public void on(OrderShippedEvent event) {
|
||||
update(event.getOrderId(), Order::setOrderShipped);
|
||||
}
|
||||
|
||||
@QueryHandler
|
||||
public List<Order> handle(FindAllOrderedProductsQuery query) {
|
||||
List<Order> orderList = new ArrayList<>();
|
||||
orders.find()
|
||||
.forEach(d -> orderList.add(documentToOrder(d)));
|
||||
return orderList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Order> handleStreaming(FindAllOrderedProductsQuery query) {
|
||||
return Flux.fromIterable(orders.find())
|
||||
.map(this::documentToOrder);
|
||||
}
|
||||
|
||||
@QueryHandler
|
||||
public Integer handle(TotalProductsShippedQuery query) {
|
||||
AtomicInteger result = new AtomicInteger();
|
||||
orders.find(shippedProductFilter(query.getProductId()))
|
||||
.map(d -> d.get(PRODUCTS_PROPERTY_NAME, Document.class))
|
||||
.map(d -> d.getInteger(query.getProductId(), 0))
|
||||
.forEach(result::addAndGet);
|
||||
return result.get();
|
||||
}
|
||||
|
||||
@QueryHandler
|
||||
public Order handle(OrderUpdatesQuery query) {
|
||||
return getOrder(query.getOrderId()).orElse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset(List<Order> orderList) {
|
||||
orders.deleteMany(new Document());
|
||||
orderList.forEach(o -> orders.insertOne(orderToDocument(o)));
|
||||
}
|
||||
|
||||
private Optional<Order> getOrder(String orderId) {
|
||||
return Optional.ofNullable(orders.find(eq(ORDER_ID_PROPERTY_NAME, orderId))
|
||||
.first())
|
||||
.map(this::documentToOrder);
|
||||
}
|
||||
|
||||
private Order emitUpdate(Order order) {
|
||||
emitter.emit(OrderUpdatesQuery.class, q -> order.getOrderId()
|
||||
.equals(q.getOrderId()), order);
|
||||
return order;
|
||||
}
|
||||
|
||||
private Order updateOrder(Order order, Consumer<Order> updateFunction) {
|
||||
updateFunction.accept(order);
|
||||
return order;
|
||||
}
|
||||
|
||||
private UpdateResult persistUpdate(Order order) {
|
||||
return orders.replaceOne(eq(ORDER_ID_PROPERTY_NAME, order.getOrderId()), orderToDocument(order));
|
||||
}
|
||||
|
||||
private void update(String orderId, Consumer<Order> updateFunction) {
|
||||
UpdateResult result = getOrder(orderId).map(o -> updateOrder(o, updateFunction))
|
||||
.map(this::emitUpdate)
|
||||
.map(this::persistUpdate)
|
||||
.orElse(null);
|
||||
logger.info("Result of updating order with orderId '{}': {}", orderId, result);
|
||||
}
|
||||
|
||||
private Document orderToDocument(Order order) {
|
||||
return new Document(ORDER_ID_PROPERTY_NAME, order.getOrderId()).append(PRODUCTS_PROPERTY_NAME, order.getProducts())
|
||||
.append(ORDER_STATUS_PROPERTY_NAME, order.getOrderStatus()
|
||||
.toString());
|
||||
}
|
||||
|
||||
private Order documentToOrder(@NotNull Document document) {
|
||||
Order order = new Order(document.getString(ORDER_ID_PROPERTY_NAME));
|
||||
Document products = document.get(PRODUCTS_PROPERTY_NAME, Document.class);
|
||||
products.forEach((k, v) -> order.getProducts()
|
||||
.put(k, (Integer) v));
|
||||
String status = document.getString(ORDER_STATUS_PROPERTY_NAME);
|
||||
if (OrderStatus.CONFIRMED.toString()
|
||||
.equals(status)) {
|
||||
order.setOrderConfirmed();
|
||||
} else if (OrderStatus.SHIPPED.toString()
|
||||
.equals(status)) {
|
||||
order.setOrderShipped();
|
||||
}
|
||||
return order;
|
||||
}
|
||||
|
||||
private Bson shippedProductFilter(String productId) {
|
||||
return and(eq(ORDER_STATUS_PROPERTY_NAME, OrderStatus.SHIPPED.toString()), exists(String.format(PRODUCTS_PROPERTY_NAME + ".%s", productId)));
|
||||
}
|
||||
}
|
|
@ -37,12 +37,12 @@ public class OrderQueryService {
|
|||
|
||||
public Flux<OrderResponse> allOrdersStreaming() {
|
||||
Publisher<Order> publisher = queryGateway.streamingQuery(new FindAllOrderedProductsQuery(), Order.class);
|
||||
return Flux.from(publisher).map(OrderResponse::new);
|
||||
return Flux.from(publisher)
|
||||
.map(OrderResponse::new);
|
||||
}
|
||||
|
||||
public Integer totalShipped(String productId) {
|
||||
return queryGateway.scatterGather(new TotalProductsShippedQuery(productId),
|
||||
ResponseTypes.instanceOf(Integer.class), 10L, TimeUnit.SECONDS)
|
||||
return queryGateway.scatterGather(new TotalProductsShippedQuery(productId), ResponseTypes.instanceOf(Integer.class), 10L, TimeUnit.SECONDS)
|
||||
.reduce(0, Integer::sum);
|
||||
}
|
||||
|
||||
|
|
|
@ -7,7 +7,8 @@ public enum OrderStatusResponse {
|
|||
|
||||
static OrderStatusResponse toResponse(OrderStatus status) {
|
||||
for (OrderStatusResponse response : values()) {
|
||||
if (response.toString().equals(status.toString())) {
|
||||
if (response.toString()
|
||||
.equals(status.toString())) {
|
||||
return response;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery;
|
|||
import com.baeldung.axon.coreapi.queries.Order;
|
||||
import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery;
|
||||
import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
|
||||
import java.util.List;
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
spring.data.mongodb.host=localhost
|
||||
spring.data.mongodb.port=27017
|
||||
spring.data.mongodb.authentication-database=admin
|
||||
spring.data.mongodb.username=admin1234
|
||||
spring.data.mongodb.password=somepassword
|
||||
spring.data.mongodb.database=order-projection
|
|
@ -17,6 +17,7 @@ import com.baeldung.axon.coreapi.events.ProductRemovedEvent;
|
|||
import com.baeldung.axon.coreapi.exceptions.DuplicateOrderLineException;
|
||||
import com.baeldung.axon.coreapi.exceptions.OrderAlreadyConfirmedException;
|
||||
import com.baeldung.axon.coreapi.exceptions.UnconfirmedOrderException;
|
||||
|
||||
import org.axonframework.test.aggregate.AggregateTestFixture;
|
||||
import org.axonframework.test.aggregate.FixtureConfiguration;
|
||||
import org.axonframework.test.matchers.Matchers;
|
||||
|
@ -26,8 +27,10 @@ import java.util.UUID;
|
|||
|
||||
class OrderAggregateUnitTest {
|
||||
|
||||
private static final String ORDER_ID = UUID.randomUUID().toString();
|
||||
private static final String PRODUCT_ID = UUID.randomUUID().toString();
|
||||
private static final String ORDER_ID = UUID.randomUUID()
|
||||
.toString();
|
||||
private static final String PRODUCT_ID = UUID.randomUUID()
|
||||
.toString();
|
||||
|
||||
private FixtureConfiguration<OrderAggregate> fixture;
|
||||
|
||||
|
@ -67,9 +70,7 @@ class OrderAggregateUnitTest {
|
|||
|
||||
@Test
|
||||
void givenOrderCreatedEventProductAddedEventAndProductCountIncrementedEvent_whenDecrementProductCountCommand_thenShouldPublishProductCountDecrementedEvent() {
|
||||
fixture.given(new OrderCreatedEvent(ORDER_ID),
|
||||
new ProductAddedEvent(ORDER_ID, PRODUCT_ID),
|
||||
new ProductCountIncrementedEvent(ORDER_ID, PRODUCT_ID))
|
||||
fixture.given(new OrderCreatedEvent(ORDER_ID), new ProductAddedEvent(ORDER_ID, PRODUCT_ID), new ProductCountIncrementedEvent(ORDER_ID, PRODUCT_ID))
|
||||
.when(new DecrementProductCountCommand(ORDER_ID, PRODUCT_ID))
|
||||
.expectEvents(new ProductCountDecrementedEvent(ORDER_ID, PRODUCT_ID));
|
||||
}
|
||||
|
@ -119,9 +120,7 @@ class OrderAggregateUnitTest {
|
|||
|
||||
@Test
|
||||
void givenOrderCreatedEventProductAddedEventAndOrderConfirmedEvent_whenIncrementProductCountCommand_thenShouldThrowOrderAlreadyConfirmedException() {
|
||||
fixture.given(new OrderCreatedEvent(ORDER_ID),
|
||||
new ProductAddedEvent(ORDER_ID, PRODUCT_ID),
|
||||
new OrderConfirmedEvent(ORDER_ID))
|
||||
fixture.given(new OrderCreatedEvent(ORDER_ID), new ProductAddedEvent(ORDER_ID, PRODUCT_ID), new OrderConfirmedEvent(ORDER_ID))
|
||||
.when(new IncrementProductCountCommand(ORDER_ID, PRODUCT_ID))
|
||||
.expectException(OrderAlreadyConfirmedException.class)
|
||||
.expectExceptionMessage(Matchers.predicate(message -> ((String) message).contains(ORDER_ID)));
|
||||
|
@ -129,9 +128,7 @@ class OrderAggregateUnitTest {
|
|||
|
||||
@Test
|
||||
void givenOrderCreatedEventProductAddedEventAndOrderConfirmedEvent_whenDecrementProductCountCommand_thenShouldThrowOrderAlreadyConfirmedException() {
|
||||
fixture.given(new OrderCreatedEvent(ORDER_ID),
|
||||
new ProductAddedEvent(ORDER_ID, PRODUCT_ID),
|
||||
new OrderConfirmedEvent(ORDER_ID))
|
||||
fixture.given(new OrderCreatedEvent(ORDER_ID), new ProductAddedEvent(ORDER_ID, PRODUCT_ID), new OrderConfirmedEvent(ORDER_ID))
|
||||
.when(new DecrementProductCountCommand(ORDER_ID, PRODUCT_ID))
|
||||
.expectException(OrderAlreadyConfirmedException.class)
|
||||
.expectExceptionMessage(Matchers.predicate(message -> ((String) message).contains(ORDER_ID)));
|
||||
|
|
|
@ -12,8 +12,10 @@ import com.baeldung.axon.coreapi.queries.Order;
|
|||
import com.baeldung.axon.coreapi.queries.OrderStatus;
|
||||
import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery;
|
||||
import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery;
|
||||
|
||||
import org.axonframework.queryhandling.QueryUpdateEmitter;
|
||||
import org.junit.jupiter.api.*;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
|
@ -27,10 +29,14 @@ import static org.mockito.Mockito.*;
|
|||
|
||||
public abstract class AbstractOrdersEventHandlerUnitTest {
|
||||
|
||||
private static final String ORDER_ID_1 = UUID.randomUUID().toString();
|
||||
private static final String ORDER_ID_2 = UUID.randomUUID().toString();
|
||||
private static final String PRODUCT_ID_1 = UUID.randomUUID().toString();
|
||||
private static final String PRODUCT_ID_2 = UUID.randomUUID().toString();
|
||||
private static final String ORDER_ID_1 = UUID.randomUUID()
|
||||
.toString();
|
||||
private static final String ORDER_ID_2 = UUID.randomUUID()
|
||||
.toString();
|
||||
private static final String PRODUCT_ID_1 = UUID.randomUUID()
|
||||
.toString();
|
||||
private static final String PRODUCT_ID_2 = UUID.randomUUID()
|
||||
.toString();
|
||||
private OrdersEventHandler handler;
|
||||
private static Order orderOne;
|
||||
private static Order orderTwo;
|
||||
|
@ -39,12 +45,15 @@ public abstract class AbstractOrdersEventHandlerUnitTest {
|
|||
@BeforeAll
|
||||
static void createOrders() {
|
||||
orderOne = new Order(ORDER_ID_1);
|
||||
orderOne.getProducts().put(PRODUCT_ID_1, 3);
|
||||
orderOne.getProducts()
|
||||
.put(PRODUCT_ID_1, 3);
|
||||
orderOne.setOrderShipped();
|
||||
|
||||
orderTwo = new Order(ORDER_ID_2);
|
||||
orderTwo.getProducts().put(PRODUCT_ID_1, 1);
|
||||
orderTwo.getProducts().put(PRODUCT_ID_2, 1);
|
||||
orderTwo.getProducts()
|
||||
.put(PRODUCT_ID_1, 1);
|
||||
orderTwo.getProducts()
|
||||
.put(PRODUCT_ID_2, 1);
|
||||
orderTwo.setOrderConfirmed();
|
||||
}
|
||||
|
||||
|
@ -64,10 +73,18 @@ public abstract class AbstractOrdersEventHandlerUnitTest {
|
|||
assertNotNull(result);
|
||||
assertEquals(2, result.size());
|
||||
|
||||
Order order_1 = result.stream().filter(o -> o.getOrderId().equals(ORDER_ID_1)).findFirst().orElse(null);
|
||||
Order order_1 = result.stream()
|
||||
.filter(o -> o.getOrderId()
|
||||
.equals(ORDER_ID_1))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
assertEquals(orderOne, order_1);
|
||||
|
||||
Order order_2 = result.stream().filter(o -> o.getOrderId().equals(ORDER_ID_2)).findFirst().orElse(null);
|
||||
Order order_2 = result.stream()
|
||||
.filter(o -> o.getOrderId()
|
||||
.equals(ORDER_ID_2))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
assertEquals(orderTwo, order_2);
|
||||
}
|
||||
|
||||
|
@ -75,9 +92,11 @@ public abstract class AbstractOrdersEventHandlerUnitTest {
|
|||
void givenTwoOrdersPlacedOfWhichOneNotShipped_whenFindAllOrderedProductsQueryStreaming_thenCorrectOrdersAreReturned() {
|
||||
resetWithTwoOrders();
|
||||
final Consumer<Order> orderVerifier = order -> {
|
||||
if (order.getOrderId().equals(orderOne.getOrderId())) {
|
||||
if (order.getOrderId()
|
||||
.equals(orderOne.getOrderId())) {
|
||||
assertEquals(orderOne, order);
|
||||
} else if (order.getOrderId().equals(orderTwo.getOrderId())) {
|
||||
} else if (order.getOrderId()
|
||||
.equals(orderTwo.getOrderId())) {
|
||||
assertEquals(orderTwo, order);
|
||||
} else {
|
||||
throw new RuntimeException("Would expect either order one or order two");
|
||||
|
@ -120,7 +139,8 @@ public abstract class AbstractOrdersEventHandlerUnitTest {
|
|||
Order result = handler.handle(new OrderUpdatesQuery(ORDER_ID_1));
|
||||
assertNotNull(result);
|
||||
assertEquals(ORDER_ID_1, result.getOrderId());
|
||||
assertEquals(3, result.getProducts().get(PRODUCT_ID_1));
|
||||
assertEquals(3, result.getProducts()
|
||||
.get(PRODUCT_ID_1));
|
||||
assertEquals(OrderStatus.SHIPPED, result.getOrderStatus());
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
package com.baeldung.axon.querymodel;
|
||||
|
||||
import com.mongodb.client.MongoClient;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest;
|
||||
|
||||
@DataMongoTest
|
||||
public class MongoOrdersEventHandlerUnitTest extends AbstractOrdersEventHandlerUnitTest {
|
||||
|
||||
@Autowired
|
||||
MongoClient mongoClient;
|
||||
|
||||
@Override
|
||||
protected OrdersEventHandler getHandler() {
|
||||
mongoClient.getDatabase("axonframework")
|
||||
.drop();
|
||||
return new MongoOrdersEventHandler(mongoClient, emitter);
|
||||
}
|
||||
}
|
|
@ -6,7 +6,6 @@ import com.baeldung.axon.coreapi.events.OrderShippedEvent;
|
|||
import com.baeldung.axon.coreapi.events.ProductAddedEvent;
|
||||
import com.baeldung.axon.coreapi.events.ProductCountDecrementedEvent;
|
||||
import com.baeldung.axon.coreapi.events.ProductCountIncrementedEvent;
|
||||
import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery;
|
||||
import com.baeldung.axon.coreapi.queries.Order;
|
||||
|
||||
import org.axonframework.eventhandling.gateway.EventGateway;
|
||||
|
|
|
@ -1 +1,2 @@
|
|||
spring.mongodb.embedded.version=5.0.6
|
||||
axon.axonserver.enabled=false
|
|
@ -0,0 +1,7 @@
|
|||
docker run \
|
||||
-d \
|
||||
--name order_projection \
|
||||
-p 27017:27017 \
|
||||
-e MONGO_INITDB_ROOT_USERNAME=admin1234 \
|
||||
-e MONGO_INITDB_ROOT_PASSWORD=somepassword \
|
||||
mongo
|
Loading…
Reference in New Issue