diff --git a/axon/README.md b/axon/README.md index 40547a68b8..5db38a7c94 100644 --- a/axon/README.md +++ b/axon/README.md @@ -2,11 +2,17 @@ This module contains articles about Axon +## Profiles + +Optionally the code can be run with the 'mongo' profile to use Mongo to store the projection. Otherwise, an in-memory +projection is used. + ## Scripts -One script is included to easily start middleware using Docker: +Two scripts are included to easily start middleware using Docker matching the properties files: - `start_axon_server.sh` to start an Axon Server instance +- `start_mongo.sh` to start a MongoDB instance ### Relevant articles @@ -14,3 +20,4 @@ One script is included to easily start middleware using Docker: - [Multi-Entity Aggregates in Axon](https://www.baeldung.com/java-axon-multi-entity-aggregates) - [Snapshotting Aggregates in Axon](https://www.baeldung.com/axon-snapshotting-aggregates) - [Dispatching Queries in Axon Framework](https://www.baeldung.com/axon-query-dispatching) +- [Persisting the Query Model](https://www.baeldung.com/persisting-the-query-model) diff --git a/axon/pom.xml b/axon/pom.xml index df14233863..cfdd24ff3c 100644 --- a/axon/pom.xml +++ b/axon/pom.xml @@ -44,6 +44,10 @@ org.springframework.boot spring-boot-starter-data-jpa + + org.axonframework.extensions.mongo + axon-mongo + io.projectreactor reactor-core @@ -68,10 +72,23 @@ reactor-test test + + de.flapdoodle.embed + de.flapdoodle.embed.mongo + ${de.flapdoodle.embed.mongo.version} + test + + + org.awaitility + awaitility + 4.2.0 + test + - 4.6.0 + 4.6.2 + 3.4.8 \ No newline at end of file diff --git a/axon/src/main/java/com/baeldung/axon/OrderApplication.java b/axon/src/main/java/com/baeldung/axon/OrderApplication.java index 8f507e141c..9e5334c3a3 100644 --- a/axon/src/main/java/com/baeldung/axon/OrderApplication.java +++ b/axon/src/main/java/com/baeldung/axon/OrderApplication.java @@ -9,5 +9,4 @@ public class OrderApplication { public static void main(String[] args) { SpringApplication.run(OrderApplication.class, args); } - } \ No newline at end of file diff --git a/axon/src/main/java/com/baeldung/axon/OrderApplicationConfiguration.java b/axon/src/main/java/com/baeldung/axon/OrderApplicationConfiguration.java index 8b743144b4..e1b7d5b14c 100644 --- a/axon/src/main/java/com/baeldung/axon/OrderApplicationConfiguration.java +++ b/axon/src/main/java/com/baeldung/axon/OrderApplicationConfiguration.java @@ -11,8 +11,7 @@ import org.springframework.context.annotation.Configuration; public class OrderApplicationConfiguration { @Bean - public SnapshotTriggerDefinition orderAggregateSnapshotTriggerDefinition(Snapshotter snapshotter, - @Value("${axon.aggregate.order.snapshot-threshold:250}") int threshold) { + public SnapshotTriggerDefinition orderAggregateSnapshotTriggerDefinition(Snapshotter snapshotter, @Value("${axon.aggregate.order.snapshot-threshold:250}") int threshold) { return new EventCountSnapshotTriggerDefinition(snapshotter, threshold); } } diff --git a/axon/src/main/java/com/baeldung/axon/commandmodel/order/OrderAggregate.java b/axon/src/main/java/com/baeldung/axon/commandmodel/order/OrderAggregate.java index 1065e9d22b..c58c0b27ac 100644 --- a/axon/src/main/java/com/baeldung/axon/commandmodel/order/OrderAggregate.java +++ b/axon/src/main/java/com/baeldung/axon/commandmodel/order/OrderAggregate.java @@ -12,6 +12,7 @@ import com.baeldung.axon.coreapi.events.ProductRemovedEvent; import com.baeldung.axon.coreapi.exceptions.DuplicateOrderLineException; import com.baeldung.axon.coreapi.exceptions.OrderAlreadyConfirmedException; import com.baeldung.axon.coreapi.exceptions.UnconfirmedOrderException; + import org.axonframework.commandhandling.CommandHandler; import org.axonframework.eventsourcing.EventSourcingHandler; import org.axonframework.modelling.command.AggregateIdentifier; diff --git a/axon/src/main/java/com/baeldung/axon/commandmodel/order/OrderLine.java b/axon/src/main/java/com/baeldung/axon/commandmodel/order/OrderLine.java index e471ecbfe0..aa774070c0 100644 --- a/axon/src/main/java/com/baeldung/axon/commandmodel/order/OrderLine.java +++ b/axon/src/main/java/com/baeldung/axon/commandmodel/order/OrderLine.java @@ -7,6 +7,7 @@ import com.baeldung.axon.coreapi.events.ProductCountDecrementedEvent; import com.baeldung.axon.coreapi.events.ProductCountIncrementedEvent; import com.baeldung.axon.coreapi.events.ProductRemovedEvent; import com.baeldung.axon.coreapi.exceptions.OrderAlreadyConfirmedException; + import org.axonframework.commandhandling.CommandHandler; import org.axonframework.eventsourcing.EventSourcingHandler; import org.axonframework.modelling.command.EntityId; diff --git a/axon/src/main/java/com/baeldung/axon/coreapi/commands/AddProductCommand.java b/axon/src/main/java/com/baeldung/axon/coreapi/commands/AddProductCommand.java index 28736aaadc..8f2babc2dc 100644 --- a/axon/src/main/java/com/baeldung/axon/coreapi/commands/AddProductCommand.java +++ b/axon/src/main/java/com/baeldung/axon/coreapi/commands/AddProductCommand.java @@ -42,9 +42,6 @@ public class AddProductCommand { @Override public String toString() { - return "AddProductCommand{" + - "orderId='" + orderId + '\'' + - ", productId='" + productId + '\'' + - '}'; + return "AddProductCommand{" + "orderId='" + orderId + '\'' + ", productId='" + productId + '\'' + '}'; } } diff --git a/axon/src/main/java/com/baeldung/axon/coreapi/commands/ConfirmOrderCommand.java b/axon/src/main/java/com/baeldung/axon/coreapi/commands/ConfirmOrderCommand.java index 244b69f3b7..37f1df2725 100644 --- a/axon/src/main/java/com/baeldung/axon/coreapi/commands/ConfirmOrderCommand.java +++ b/axon/src/main/java/com/baeldung/axon/coreapi/commands/ConfirmOrderCommand.java @@ -36,8 +36,6 @@ public class ConfirmOrderCommand { @Override public String toString() { - return "ConfirmOrderCommand{" + - "orderId='" + orderId + '\'' + - '}'; + return "ConfirmOrderCommand{" + "orderId='" + orderId + '\'' + '}'; } } diff --git a/axon/src/main/java/com/baeldung/axon/coreapi/commands/CreateOrderCommand.java b/axon/src/main/java/com/baeldung/axon/coreapi/commands/CreateOrderCommand.java index ceb7fd6a08..294d53cab8 100644 --- a/axon/src/main/java/com/baeldung/axon/coreapi/commands/CreateOrderCommand.java +++ b/axon/src/main/java/com/baeldung/axon/coreapi/commands/CreateOrderCommand.java @@ -36,8 +36,6 @@ public class CreateOrderCommand { @Override public String toString() { - return "CreateOrderCommand{" + - "orderId='" + orderId + '\'' + - '}'; + return "CreateOrderCommand{" + "orderId='" + orderId + '\'' + '}'; } } \ No newline at end of file diff --git a/axon/src/main/java/com/baeldung/axon/coreapi/commands/DecrementProductCountCommand.java b/axon/src/main/java/com/baeldung/axon/coreapi/commands/DecrementProductCountCommand.java index f6f4db00fc..0874f88df8 100644 --- a/axon/src/main/java/com/baeldung/axon/coreapi/commands/DecrementProductCountCommand.java +++ b/axon/src/main/java/com/baeldung/axon/coreapi/commands/DecrementProductCountCommand.java @@ -42,9 +42,6 @@ public class DecrementProductCountCommand { @Override public String toString() { - return "DecrementProductCountCommand{" + - "orderId='" + orderId + '\'' + - ", productId='" + productId + '\'' + - '}'; + return "DecrementProductCountCommand{" + "orderId='" + orderId + '\'' + ", productId='" + productId + '\'' + '}'; } } diff --git a/axon/src/main/java/com/baeldung/axon/coreapi/commands/IncrementProductCountCommand.java b/axon/src/main/java/com/baeldung/axon/coreapi/commands/IncrementProductCountCommand.java index 548faabe37..aceb93e478 100644 --- a/axon/src/main/java/com/baeldung/axon/coreapi/commands/IncrementProductCountCommand.java +++ b/axon/src/main/java/com/baeldung/axon/coreapi/commands/IncrementProductCountCommand.java @@ -42,9 +42,6 @@ public class IncrementProductCountCommand { @Override public String toString() { - return "IncrementProductCountCommand{" + - "orderId='" + orderId + '\'' + - ", productId='" + productId + '\'' + - '}'; + return "IncrementProductCountCommand{" + "orderId='" + orderId + '\'' + ", productId='" + productId + '\'' + '}'; } } diff --git a/axon/src/main/java/com/baeldung/axon/coreapi/commands/ShipOrderCommand.java b/axon/src/main/java/com/baeldung/axon/coreapi/commands/ShipOrderCommand.java index 7312bc1fdb..f0f3a6e071 100644 --- a/axon/src/main/java/com/baeldung/axon/coreapi/commands/ShipOrderCommand.java +++ b/axon/src/main/java/com/baeldung/axon/coreapi/commands/ShipOrderCommand.java @@ -36,8 +36,6 @@ public class ShipOrderCommand { @Override public String toString() { - return "ShipOrderCommand{" + - "orderId='" + orderId + '\'' + - '}'; + return "ShipOrderCommand{" + "orderId='" + orderId + '\'' + '}'; } } \ No newline at end of file diff --git a/axon/src/main/java/com/baeldung/axon/coreapi/events/OrderConfirmedEvent.java b/axon/src/main/java/com/baeldung/axon/coreapi/events/OrderConfirmedEvent.java index d2b7d58435..3253c2f1d5 100644 --- a/axon/src/main/java/com/baeldung/axon/coreapi/events/OrderConfirmedEvent.java +++ b/axon/src/main/java/com/baeldung/axon/coreapi/events/OrderConfirmedEvent.java @@ -33,8 +33,6 @@ public class OrderConfirmedEvent { @Override public String toString() { - return "OrderConfirmedEvent{" + - "orderId='" + orderId + '\'' + - '}'; + return "OrderConfirmedEvent{" + "orderId='" + orderId + '\'' + '}'; } } diff --git a/axon/src/main/java/com/baeldung/axon/coreapi/events/OrderCreatedEvent.java b/axon/src/main/java/com/baeldung/axon/coreapi/events/OrderCreatedEvent.java index 5d2d8b7f55..2bb86108f9 100644 --- a/axon/src/main/java/com/baeldung/axon/coreapi/events/OrderCreatedEvent.java +++ b/axon/src/main/java/com/baeldung/axon/coreapi/events/OrderCreatedEvent.java @@ -33,8 +33,6 @@ public class OrderCreatedEvent { @Override public String toString() { - return "OrderCreatedEvent{" + - "orderId='" + orderId + '\'' + - '}'; + return "OrderCreatedEvent{" + "orderId='" + orderId + '\'' + '}'; } } \ No newline at end of file diff --git a/axon/src/main/java/com/baeldung/axon/coreapi/events/OrderShippedEvent.java b/axon/src/main/java/com/baeldung/axon/coreapi/events/OrderShippedEvent.java index 76aa684629..5161bc7bf1 100644 --- a/axon/src/main/java/com/baeldung/axon/coreapi/events/OrderShippedEvent.java +++ b/axon/src/main/java/com/baeldung/axon/coreapi/events/OrderShippedEvent.java @@ -33,8 +33,6 @@ public class OrderShippedEvent { @Override public String toString() { - return "OrderShippedEvent{" + - "orderId='" + orderId + '\'' + - '}'; + return "OrderShippedEvent{" + "orderId='" + orderId + '\'' + '}'; } } \ No newline at end of file diff --git a/axon/src/main/java/com/baeldung/axon/coreapi/events/ProductAddedEvent.java b/axon/src/main/java/com/baeldung/axon/coreapi/events/ProductAddedEvent.java index 091ef2a570..14510cb161 100644 --- a/axon/src/main/java/com/baeldung/axon/coreapi/events/ProductAddedEvent.java +++ b/axon/src/main/java/com/baeldung/axon/coreapi/events/ProductAddedEvent.java @@ -39,9 +39,6 @@ public class ProductAddedEvent { @Override public String toString() { - return "ProductAddedEvent{" + - "orderId='" + orderId + '\'' + - ", productId='" + productId + '\'' + - '}'; + return "ProductAddedEvent{" + "orderId='" + orderId + '\'' + ", productId='" + productId + '\'' + '}'; } } diff --git a/axon/src/main/java/com/baeldung/axon/coreapi/events/ProductCountDecrementedEvent.java b/axon/src/main/java/com/baeldung/axon/coreapi/events/ProductCountDecrementedEvent.java index 4017916791..eec424cb24 100644 --- a/axon/src/main/java/com/baeldung/axon/coreapi/events/ProductCountDecrementedEvent.java +++ b/axon/src/main/java/com/baeldung/axon/coreapi/events/ProductCountDecrementedEvent.java @@ -39,9 +39,6 @@ public class ProductCountDecrementedEvent { @Override public String toString() { - return "ProductCountDecrementedEvent{" + - "orderId='" + orderId + '\'' + - ", productId='" + productId + '\'' + - '}'; + return "ProductCountDecrementedEvent{" + "orderId='" + orderId + '\'' + ", productId='" + productId + '\'' + '}'; } } diff --git a/axon/src/main/java/com/baeldung/axon/coreapi/events/ProductCountIncrementedEvent.java b/axon/src/main/java/com/baeldung/axon/coreapi/events/ProductCountIncrementedEvent.java index 2910a9ea6f..0d008758c7 100644 --- a/axon/src/main/java/com/baeldung/axon/coreapi/events/ProductCountIncrementedEvent.java +++ b/axon/src/main/java/com/baeldung/axon/coreapi/events/ProductCountIncrementedEvent.java @@ -39,9 +39,6 @@ public class ProductCountIncrementedEvent { @Override public String toString() { - return "ProductCountIncrementedEvent{" + - "orderId='" + orderId + '\'' + - ", productId='" + productId + '\'' + - '}'; + return "ProductCountIncrementedEvent{" + "orderId='" + orderId + '\'' + ", productId='" + productId + '\'' + '}'; } } diff --git a/axon/src/main/java/com/baeldung/axon/coreapi/events/ProductRemovedEvent.java b/axon/src/main/java/com/baeldung/axon/coreapi/events/ProductRemovedEvent.java index 7f89ccd1cc..13ff77db26 100644 --- a/axon/src/main/java/com/baeldung/axon/coreapi/events/ProductRemovedEvent.java +++ b/axon/src/main/java/com/baeldung/axon/coreapi/events/ProductRemovedEvent.java @@ -39,9 +39,6 @@ public class ProductRemovedEvent { @Override public String toString() { - return "ProductRemovedEvent{" + - "orderId='" + orderId + '\'' + - ", productId='" + productId + '\'' + - '}'; + return "ProductRemovedEvent{" + "orderId='" + orderId + '\'' + ", productId='" + productId + '\'' + '}'; } } diff --git a/axon/src/main/java/com/baeldung/axon/coreapi/queries/Order.java b/axon/src/main/java/com/baeldung/axon/coreapi/queries/Order.java index 1810a053d3..9cef3e84ee 100644 --- a/axon/src/main/java/com/baeldung/axon/coreapi/queries/Order.java +++ b/axon/src/main/java/com/baeldung/axon/coreapi/queries/Order.java @@ -40,7 +40,6 @@ public class Order { products.computeIfPresent(productId, (id, count) -> --count); } - public void removeProduct(String productId) { products.remove(productId); } @@ -62,9 +61,7 @@ public class Order { return false; } Order that = (Order) o; - return Objects.equals(orderId, that.orderId) - && Objects.equals(products, that.products) - && orderStatus == that.orderStatus; + return Objects.equals(orderId, that.orderId) && Objects.equals(products, that.products) && orderStatus == that.orderStatus; } @Override @@ -74,10 +71,6 @@ public class Order { @Override public String toString() { - return "Order{" + - "orderId='" + orderId + '\'' + - ", products=" + products + - ", orderStatus=" + orderStatus + - '}'; + return "Order{" + "orderId='" + orderId + '\'' + ", products=" + products + ", orderStatus=" + orderStatus + '}'; } } diff --git a/axon/src/main/java/com/baeldung/axon/coreapi/queries/OrderUpdatesQuery.java b/axon/src/main/java/com/baeldung/axon/coreapi/queries/OrderUpdatesQuery.java index 37d2e67445..b4e66289b4 100644 --- a/axon/src/main/java/com/baeldung/axon/coreapi/queries/OrderUpdatesQuery.java +++ b/axon/src/main/java/com/baeldung/axon/coreapi/queries/OrderUpdatesQuery.java @@ -32,8 +32,6 @@ public class OrderUpdatesQuery { @Override public String toString() { - return "OrderUpdatesQuery{" + - "orderId='" + orderId + '\'' + - '}'; + return "OrderUpdatesQuery{" + "orderId='" + orderId + '\'' + '}'; } } diff --git a/axon/src/main/java/com/baeldung/axon/coreapi/queries/TotalProductsShippedQuery.java b/axon/src/main/java/com/baeldung/axon/coreapi/queries/TotalProductsShippedQuery.java index 3a4129685b..8c3bbf3528 100644 --- a/axon/src/main/java/com/baeldung/axon/coreapi/queries/TotalProductsShippedQuery.java +++ b/axon/src/main/java/com/baeldung/axon/coreapi/queries/TotalProductsShippedQuery.java @@ -32,8 +32,6 @@ public class TotalProductsShippedQuery { @Override public String toString() { - return "TotalProductsShippedQuery{" + - "productId='" + productId + '\'' + - '}'; + return "TotalProductsShippedQuery{" + "productId='" + productId + '\'' + '}'; } } diff --git a/axon/src/main/java/com/baeldung/axon/gui/OrderRestEndpoint.java b/axon/src/main/java/com/baeldung/axon/gui/OrderRestEndpoint.java index 64058d5eca..186aab3c0c 100644 --- a/axon/src/main/java/com/baeldung/axon/gui/OrderRestEndpoint.java +++ b/axon/src/main/java/com/baeldung/axon/gui/OrderRestEndpoint.java @@ -8,12 +8,14 @@ import com.baeldung.axon.coreapi.commands.IncrementProductCountCommand; import com.baeldung.axon.coreapi.commands.ShipOrderCommand; import com.baeldung.axon.querymodel.OrderQueryService; import com.baeldung.axon.querymodel.OrderResponse; + import org.axonframework.commandhandling.gateway.CommandGateway; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; + import reactor.core.publisher.Flux; import java.util.List; @@ -33,25 +35,28 @@ public class OrderRestEndpoint { @PostMapping("/ship-order") public CompletableFuture shipOrder() { - String orderId = UUID.randomUUID().toString(); + String orderId = UUID.randomUUID() + .toString(); return commandGateway.send(new CreateOrderCommand(orderId)) - .thenCompose(result -> commandGateway.send(new AddProductCommand(orderId, "Deluxe Chair"))) - .thenCompose(result -> commandGateway.send(new ConfirmOrderCommand(orderId))) - .thenCompose(result -> commandGateway.send(new ShipOrderCommand(orderId))); + .thenCompose(result -> commandGateway.send(new AddProductCommand(orderId, "Deluxe Chair"))) + .thenCompose(result -> commandGateway.send(new ConfirmOrderCommand(orderId))) + .thenCompose(result -> commandGateway.send(new ShipOrderCommand(orderId))); } @PostMapping("/ship-unconfirmed-order") public CompletableFuture shipUnconfirmedOrder() { - String orderId = UUID.randomUUID().toString(); + String orderId = UUID.randomUUID() + .toString(); return commandGateway.send(new CreateOrderCommand(orderId)) - .thenCompose(result -> commandGateway.send(new AddProductCommand(orderId, "Deluxe Chair"))) - // This throws an exception, as an Order cannot be shipped if it has not been confirmed yet. - .thenCompose(result -> commandGateway.send(new ShipOrderCommand(orderId))); + .thenCompose(result -> commandGateway.send(new AddProductCommand(orderId, "Deluxe Chair"))) + // This throws an exception, as an Order cannot be shipped if it has not been confirmed yet. + .thenCompose(result -> commandGateway.send(new ShipOrderCommand(orderId))); } @PostMapping("/order") public CompletableFuture createOrder() { - return createOrder(UUID.randomUUID().toString()); + return createOrder(UUID.randomUUID() + .toString()); } @PostMapping("/order/{order-id}") @@ -60,20 +65,17 @@ public class OrderRestEndpoint { } @PostMapping("/order/{order-id}/product/{product-id}") - public CompletableFuture addProduct(@PathVariable("order-id") String orderId, - @PathVariable("product-id") String productId) { + public CompletableFuture addProduct(@PathVariable("order-id") String orderId, @PathVariable("product-id") String productId) { return commandGateway.send(new AddProductCommand(orderId, productId)); } @PostMapping("/order/{order-id}/product/{product-id}/increment") - public CompletableFuture incrementProduct(@PathVariable("order-id") String orderId, - @PathVariable("product-id") String productId) { + public CompletableFuture incrementProduct(@PathVariable("order-id") String orderId, @PathVariable("product-id") String productId) { return commandGateway.send(new IncrementProductCountCommand(orderId, productId)); } @PostMapping("/order/{order-id}/product/{product-id}/decrement") - public CompletableFuture decrementProduct(@PathVariable("order-id") String orderId, - @PathVariable("product-id") String productId) { + public CompletableFuture decrementProduct(@PathVariable("order-id") String orderId, @PathVariable("product-id") String productId) { return commandGateway.send(new DecrementProductCountCommand(orderId, productId)); } diff --git a/axon/src/main/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandler.java b/axon/src/main/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandler.java index fbdf819961..6e9dbbb335 100644 --- a/axon/src/main/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandler.java +++ b/axon/src/main/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandler.java @@ -18,7 +18,9 @@ import org.axonframework.eventhandling.EventHandler; import org.axonframework.queryhandling.QueryHandler; import org.axonframework.queryhandling.QueryUpdateEmitter; import org.reactivestreams.Publisher; +import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Service; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -30,6 +32,7 @@ import java.util.Optional; @Service @ProcessingGroup("orders") +@Profile("!mongo") public class InMemoryOrdersEventHandler implements OrdersEventHandler { private final Map orders = new HashMap<>(); @@ -106,7 +109,8 @@ public class InMemoryOrdersEventHandler implements OrdersEventHandler { @QueryHandler public Publisher handleStreaming(FindAllOrderedProductsQuery query) { - return Mono.fromCallable(orders::values).flatMapMany(Flux::fromIterable); + return Mono.fromCallable(orders::values) + .flatMapMany(Flux::fromIterable); } @QueryHandler diff --git a/axon/src/main/java/com/baeldung/axon/querymodel/MongoConfiguration.java b/axon/src/main/java/com/baeldung/axon/querymodel/MongoConfiguration.java new file mode 100644 index 0000000000..d68569efc8 --- /dev/null +++ b/axon/src/main/java/com/baeldung/axon/querymodel/MongoConfiguration.java @@ -0,0 +1,26 @@ +package com.baeldung.axon.querymodel; + +import com.mongodb.client.MongoClient; + +import org.axonframework.eventhandling.tokenstore.TokenStore; +import org.axonframework.extensions.mongo.DefaultMongoTemplate; +import org.axonframework.extensions.mongo.eventsourcing.tokenstore.MongoTokenStore; +import org.axonframework.serialization.Serializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; + +@Configuration +@Profile("mongo") +public class MongoConfiguration { + + @Bean + public TokenStore getTokenStore(MongoClient client, Serializer serializer) { + return MongoTokenStore.builder() + .mongoTemplate(DefaultMongoTemplate.builder() + .mongoDatabase(client) + .build()) + .serializer(serializer) + .build(); + } +} diff --git a/axon/src/main/java/com/baeldung/axon/querymodel/MongoOrdersEventHandler.java b/axon/src/main/java/com/baeldung/axon/querymodel/MongoOrdersEventHandler.java new file mode 100644 index 0000000000..4da98a9f97 --- /dev/null +++ b/axon/src/main/java/com/baeldung/axon/querymodel/MongoOrdersEventHandler.java @@ -0,0 +1,194 @@ +package com.baeldung.axon.querymodel; + +import com.baeldung.axon.coreapi.events.OrderConfirmedEvent; +import com.baeldung.axon.coreapi.events.OrderCreatedEvent; +import com.baeldung.axon.coreapi.events.OrderShippedEvent; +import com.baeldung.axon.coreapi.events.ProductAddedEvent; +import com.baeldung.axon.coreapi.events.ProductCountDecrementedEvent; +import com.baeldung.axon.coreapi.events.ProductCountIncrementedEvent; +import com.baeldung.axon.coreapi.events.ProductRemovedEvent; +import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery; +import com.baeldung.axon.coreapi.queries.Order; +import com.baeldung.axon.coreapi.queries.OrderStatus; +import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery; +import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.IndexOptions; +import com.mongodb.client.model.Indexes; +import com.mongodb.client.result.UpdateResult; + +import groovyjarjarantlr4.v4.runtime.misc.NotNull; + +import org.axonframework.config.ProcessingGroup; +import org.axonframework.eventhandling.EventHandler; +import org.axonframework.queryhandling.QueryHandler; +import org.axonframework.queryhandling.QueryUpdateEmitter; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Service; + +import reactor.core.publisher.Flux; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import static com.mongodb.client.model.Filters.*; + +@Service +@ProcessingGroup("orders") +@Profile("mongo") +public class MongoOrdersEventHandler implements OrdersEventHandler { + + static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup() + .lookupClass()); + + private final MongoCollection orders; + private final QueryUpdateEmitter emitter; + private static final String ORDER_COLLECTION_NAME = "orders"; + private static final String AXON_FRAMEWORK_DATABASE_NAME = "axonframework"; + + private static final String ORDER_ID_PROPERTY_NAME = "orderId"; + private static final String PRODUCTS_PROPERTY_NAME = "products"; + private static final String ORDER_STATUS_PROPERTY_NAME = "orderStatus"; + + public MongoOrdersEventHandler(MongoClient client, QueryUpdateEmitter emitter) { + orders = client.getDatabase(AXON_FRAMEWORK_DATABASE_NAME) + .getCollection(ORDER_COLLECTION_NAME); + orders.createIndex(Indexes.ascending(ORDER_ID_PROPERTY_NAME), new IndexOptions().unique(true)); + this.emitter = emitter; + } + + @EventHandler + public void on(OrderCreatedEvent event) { + orders.insertOne(orderToDocument(new Order(event.getOrderId()))); + } + + @EventHandler + public void on(ProductAddedEvent event) { + update(event.getOrderId(), o -> o.addProduct(event.getProductId())); + } + + @EventHandler + public void on(ProductCountIncrementedEvent event) { + update(event.getOrderId(), o -> o.incrementProductInstance(event.getProductId())); + } + + @EventHandler + public void on(ProductCountDecrementedEvent event) { + update(event.getOrderId(), o -> o.decrementProductInstance(event.getProductId())); + } + + @EventHandler + public void on(ProductRemovedEvent event) { + update(event.getOrderId(), o -> o.removeProduct(event.getProductId())); + } + + @EventHandler + public void on(OrderConfirmedEvent event) { + update(event.getOrderId(), Order::setOrderConfirmed); + } + + @EventHandler + public void on(OrderShippedEvent event) { + update(event.getOrderId(), Order::setOrderShipped); + } + + @QueryHandler + public List handle(FindAllOrderedProductsQuery query) { + List orderList = new ArrayList<>(); + orders.find() + .forEach(d -> orderList.add(documentToOrder(d))); + return orderList; + } + + @Override + public Publisher handleStreaming(FindAllOrderedProductsQuery query) { + return Flux.fromIterable(orders.find()) + .map(this::documentToOrder); + } + + @QueryHandler + public Integer handle(TotalProductsShippedQuery query) { + AtomicInteger result = new AtomicInteger(); + orders.find(shippedProductFilter(query.getProductId())) + .map(d -> d.get(PRODUCTS_PROPERTY_NAME, Document.class)) + .map(d -> d.getInteger(query.getProductId(), 0)) + .forEach(result::addAndGet); + return result.get(); + } + + @QueryHandler + public Order handle(OrderUpdatesQuery query) { + return getOrder(query.getOrderId()).orElse(null); + } + + @Override + public void reset(List orderList) { + orders.deleteMany(new Document()); + orderList.forEach(o -> orders.insertOne(orderToDocument(o))); + } + + private Optional getOrder(String orderId) { + return Optional.ofNullable(orders.find(eq(ORDER_ID_PROPERTY_NAME, orderId)) + .first()) + .map(this::documentToOrder); + } + + private Order emitUpdate(Order order) { + emitter.emit(OrderUpdatesQuery.class, q -> order.getOrderId() + .equals(q.getOrderId()), order); + return order; + } + + private Order updateOrder(Order order, Consumer updateFunction) { + updateFunction.accept(order); + return order; + } + + private UpdateResult persistUpdate(Order order) { + return orders.replaceOne(eq(ORDER_ID_PROPERTY_NAME, order.getOrderId()), orderToDocument(order)); + } + + private void update(String orderId, Consumer updateFunction) { + UpdateResult result = getOrder(orderId).map(o -> updateOrder(o, updateFunction)) + .map(this::emitUpdate) + .map(this::persistUpdate) + .orElse(null); + logger.info("Result of updating order with orderId '{}': {}", orderId, result); + } + + private Document orderToDocument(Order order) { + return new Document(ORDER_ID_PROPERTY_NAME, order.getOrderId()).append(PRODUCTS_PROPERTY_NAME, order.getProducts()) + .append(ORDER_STATUS_PROPERTY_NAME, order.getOrderStatus() + .toString()); + } + + private Order documentToOrder(@NotNull Document document) { + Order order = new Order(document.getString(ORDER_ID_PROPERTY_NAME)); + Document products = document.get(PRODUCTS_PROPERTY_NAME, Document.class); + products.forEach((k, v) -> order.getProducts() + .put(k, (Integer) v)); + String status = document.getString(ORDER_STATUS_PROPERTY_NAME); + if (OrderStatus.CONFIRMED.toString() + .equals(status)) { + order.setOrderConfirmed(); + } else if (OrderStatus.SHIPPED.toString() + .equals(status)) { + order.setOrderShipped(); + } + return order; + } + + private Bson shippedProductFilter(String productId) { + return and(eq(ORDER_STATUS_PROPERTY_NAME, OrderStatus.SHIPPED.toString()), exists(String.format(PRODUCTS_PROPERTY_NAME + ".%s", productId))); + } +} \ No newline at end of file diff --git a/axon/src/main/java/com/baeldung/axon/querymodel/OrderQueryService.java b/axon/src/main/java/com/baeldung/axon/querymodel/OrderQueryService.java index ae391c3cb1..8196a98957 100644 --- a/axon/src/main/java/com/baeldung/axon/querymodel/OrderQueryService.java +++ b/axon/src/main/java/com/baeldung/axon/querymodel/OrderQueryService.java @@ -37,12 +37,12 @@ public class OrderQueryService { public Flux allOrdersStreaming() { Publisher publisher = queryGateway.streamingQuery(new FindAllOrderedProductsQuery(), Order.class); - return Flux.from(publisher).map(OrderResponse::new); + return Flux.from(publisher) + .map(OrderResponse::new); } public Integer totalShipped(String productId) { - return queryGateway.scatterGather(new TotalProductsShippedQuery(productId), - ResponseTypes.instanceOf(Integer.class), 10L, TimeUnit.SECONDS) + return queryGateway.scatterGather(new TotalProductsShippedQuery(productId), ResponseTypes.instanceOf(Integer.class), 10L, TimeUnit.SECONDS) .reduce(0, Integer::sum); } diff --git a/axon/src/main/java/com/baeldung/axon/querymodel/OrderStatusResponse.java b/axon/src/main/java/com/baeldung/axon/querymodel/OrderStatusResponse.java index 90430b1e3d..2c4d32bff3 100644 --- a/axon/src/main/java/com/baeldung/axon/querymodel/OrderStatusResponse.java +++ b/axon/src/main/java/com/baeldung/axon/querymodel/OrderStatusResponse.java @@ -7,7 +7,8 @@ public enum OrderStatusResponse { static OrderStatusResponse toResponse(OrderStatus status) { for (OrderStatusResponse response : values()) { - if (response.toString().equals(status.toString())) { + if (response.toString() + .equals(status.toString())) { return response; } } diff --git a/axon/src/main/java/com/baeldung/axon/querymodel/OrdersEventHandler.java b/axon/src/main/java/com/baeldung/axon/querymodel/OrdersEventHandler.java index 7e49abf93b..740e00cf72 100644 --- a/axon/src/main/java/com/baeldung/axon/querymodel/OrdersEventHandler.java +++ b/axon/src/main/java/com/baeldung/axon/querymodel/OrdersEventHandler.java @@ -11,6 +11,7 @@ import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery; import com.baeldung.axon.coreapi.queries.Order; import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery; import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery; + import org.reactivestreams.Publisher; import java.util.List; diff --git a/axon/src/main/resources/application-mongo.properties b/axon/src/main/resources/application-mongo.properties new file mode 100644 index 0000000000..f642d1591c --- /dev/null +++ b/axon/src/main/resources/application-mongo.properties @@ -0,0 +1,6 @@ +spring.data.mongodb.host=localhost +spring.data.mongodb.port=27017 +spring.data.mongodb.authentication-database=admin +spring.data.mongodb.username=admin1234 +spring.data.mongodb.password=somepassword +spring.data.mongodb.database=order-projection \ No newline at end of file diff --git a/axon/src/test/java/com/baeldung/axon/commandmodel/OrderAggregateUnitTest.java b/axon/src/test/java/com/baeldung/axon/commandmodel/OrderAggregateUnitTest.java index c1d6bdccc2..fe7f57f854 100644 --- a/axon/src/test/java/com/baeldung/axon/commandmodel/OrderAggregateUnitTest.java +++ b/axon/src/test/java/com/baeldung/axon/commandmodel/OrderAggregateUnitTest.java @@ -17,6 +17,7 @@ import com.baeldung.axon.coreapi.events.ProductRemovedEvent; import com.baeldung.axon.coreapi.exceptions.DuplicateOrderLineException; import com.baeldung.axon.coreapi.exceptions.OrderAlreadyConfirmedException; import com.baeldung.axon.coreapi.exceptions.UnconfirmedOrderException; + import org.axonframework.test.aggregate.AggregateTestFixture; import org.axonframework.test.aggregate.FixtureConfiguration; import org.axonframework.test.matchers.Matchers; @@ -26,8 +27,10 @@ import java.util.UUID; class OrderAggregateUnitTest { - private static final String ORDER_ID = UUID.randomUUID().toString(); - private static final String PRODUCT_ID = UUID.randomUUID().toString(); + private static final String ORDER_ID = UUID.randomUUID() + .toString(); + private static final String PRODUCT_ID = UUID.randomUUID() + .toString(); private FixtureConfiguration fixture; @@ -39,101 +42,95 @@ class OrderAggregateUnitTest { @Test void giveNoPriorActivity_whenCreateOrderCommand_thenShouldPublishOrderCreatedEvent() { fixture.givenNoPriorActivity() - .when(new CreateOrderCommand(ORDER_ID)) - .expectEvents(new OrderCreatedEvent(ORDER_ID)); + .when(new CreateOrderCommand(ORDER_ID)) + .expectEvents(new OrderCreatedEvent(ORDER_ID)); } @Test void givenOrderCreatedEvent_whenAddProductCommand_thenShouldPublishProductAddedEvent() { fixture.given(new OrderCreatedEvent(ORDER_ID)) - .when(new AddProductCommand(ORDER_ID, PRODUCT_ID)) - .expectEvents(new ProductAddedEvent(ORDER_ID, PRODUCT_ID)); + .when(new AddProductCommand(ORDER_ID, PRODUCT_ID)) + .expectEvents(new ProductAddedEvent(ORDER_ID, PRODUCT_ID)); } @Test void givenOrderCreatedEventAndProductAddedEvent_whenAddProductCommandForSameProductId_thenShouldThrowDuplicateOrderLineException() { fixture.given(new OrderCreatedEvent(ORDER_ID), new ProductAddedEvent(ORDER_ID, PRODUCT_ID)) - .when(new AddProductCommand(ORDER_ID, PRODUCT_ID)) - .expectException(DuplicateOrderLineException.class) - .expectExceptionMessage(Matchers.predicate(message -> ((String) message).contains(PRODUCT_ID))); + .when(new AddProductCommand(ORDER_ID, PRODUCT_ID)) + .expectException(DuplicateOrderLineException.class) + .expectExceptionMessage(Matchers.predicate(message -> ((String) message).contains(PRODUCT_ID))); } @Test void givenOrderCreatedEventAndProductAddedEvent_whenIncrementProductCountCommand_thenShouldPublishProductCountIncrementedEvent() { fixture.given(new OrderCreatedEvent(ORDER_ID), new ProductAddedEvent(ORDER_ID, PRODUCT_ID)) - .when(new IncrementProductCountCommand(ORDER_ID, PRODUCT_ID)) - .expectEvents(new ProductCountIncrementedEvent(ORDER_ID, PRODUCT_ID)); + .when(new IncrementProductCountCommand(ORDER_ID, PRODUCT_ID)) + .expectEvents(new ProductCountIncrementedEvent(ORDER_ID, PRODUCT_ID)); } @Test void givenOrderCreatedEventProductAddedEventAndProductCountIncrementedEvent_whenDecrementProductCountCommand_thenShouldPublishProductCountDecrementedEvent() { - fixture.given(new OrderCreatedEvent(ORDER_ID), - new ProductAddedEvent(ORDER_ID, PRODUCT_ID), - new ProductCountIncrementedEvent(ORDER_ID, PRODUCT_ID)) - .when(new DecrementProductCountCommand(ORDER_ID, PRODUCT_ID)) - .expectEvents(new ProductCountDecrementedEvent(ORDER_ID, PRODUCT_ID)); + fixture.given(new OrderCreatedEvent(ORDER_ID), new ProductAddedEvent(ORDER_ID, PRODUCT_ID), new ProductCountIncrementedEvent(ORDER_ID, PRODUCT_ID)) + .when(new DecrementProductCountCommand(ORDER_ID, PRODUCT_ID)) + .expectEvents(new ProductCountDecrementedEvent(ORDER_ID, PRODUCT_ID)); } @Test void givenOrderCreatedEventAndProductAddedEvent_whenDecrementProductCountCommand_thenShouldPublishProductRemovedEvent() { fixture.given(new OrderCreatedEvent(ORDER_ID), new ProductAddedEvent(ORDER_ID, PRODUCT_ID)) - .when(new DecrementProductCountCommand(ORDER_ID, PRODUCT_ID)) - .expectEvents(new ProductRemovedEvent(ORDER_ID, PRODUCT_ID)); + .when(new DecrementProductCountCommand(ORDER_ID, PRODUCT_ID)) + .expectEvents(new ProductRemovedEvent(ORDER_ID, PRODUCT_ID)); } @Test void givenOrderCreatedEvent_whenConfirmOrderCommand_thenShouldPublishOrderConfirmedEvent() { fixture.given(new OrderCreatedEvent(ORDER_ID)) - .when(new ConfirmOrderCommand(ORDER_ID)) - .expectEvents(new OrderConfirmedEvent(ORDER_ID)); + .when(new ConfirmOrderCommand(ORDER_ID)) + .expectEvents(new OrderConfirmedEvent(ORDER_ID)); } @Test void givenOrderCreatedEventAndOrderConfirmedEvent_whenConfirmOrderCommand_thenExpectNoEvents() { fixture.given(new OrderCreatedEvent(ORDER_ID), new OrderConfirmedEvent(ORDER_ID)) - .when(new ConfirmOrderCommand(ORDER_ID)) - .expectNoEvents(); + .when(new ConfirmOrderCommand(ORDER_ID)) + .expectNoEvents(); } @Test void givenOrderCreatedEvent_whenShipOrderCommand_thenShouldThrowUnconfirmedOrderException() { fixture.given(new OrderCreatedEvent(ORDER_ID)) - .when(new ShipOrderCommand(ORDER_ID)) - .expectException(UnconfirmedOrderException.class); + .when(new ShipOrderCommand(ORDER_ID)) + .expectException(UnconfirmedOrderException.class); } @Test void givenOrderCreatedEventAndOrderConfirmedEvent_whenShipOrderCommand_thenShouldPublishOrderShippedEvent() { fixture.given(new OrderCreatedEvent(ORDER_ID), new OrderConfirmedEvent(ORDER_ID)) - .when(new ShipOrderCommand(ORDER_ID)) - .expectEvents(new OrderShippedEvent(ORDER_ID)); + .when(new ShipOrderCommand(ORDER_ID)) + .expectEvents(new OrderShippedEvent(ORDER_ID)); } @Test void givenOrderCreatedEventProductAndOrderConfirmedEvent_whenAddProductCommand_thenShouldThrowOrderAlreadyConfirmedException() { fixture.given(new OrderCreatedEvent(ORDER_ID), new OrderConfirmedEvent(ORDER_ID)) - .when(new AddProductCommand(ORDER_ID, PRODUCT_ID)) - .expectException(OrderAlreadyConfirmedException.class) - .expectExceptionMessage(Matchers.predicate(message -> ((String) message).contains(ORDER_ID))); + .when(new AddProductCommand(ORDER_ID, PRODUCT_ID)) + .expectException(OrderAlreadyConfirmedException.class) + .expectExceptionMessage(Matchers.predicate(message -> ((String) message).contains(ORDER_ID))); } @Test void givenOrderCreatedEventProductAddedEventAndOrderConfirmedEvent_whenIncrementProductCountCommand_thenShouldThrowOrderAlreadyConfirmedException() { - fixture.given(new OrderCreatedEvent(ORDER_ID), - new ProductAddedEvent(ORDER_ID, PRODUCT_ID), - new OrderConfirmedEvent(ORDER_ID)) - .when(new IncrementProductCountCommand(ORDER_ID, PRODUCT_ID)) - .expectException(OrderAlreadyConfirmedException.class) - .expectExceptionMessage(Matchers.predicate(message -> ((String) message).contains(ORDER_ID))); + fixture.given(new OrderCreatedEvent(ORDER_ID), new ProductAddedEvent(ORDER_ID, PRODUCT_ID), new OrderConfirmedEvent(ORDER_ID)) + .when(new IncrementProductCountCommand(ORDER_ID, PRODUCT_ID)) + .expectException(OrderAlreadyConfirmedException.class) + .expectExceptionMessage(Matchers.predicate(message -> ((String) message).contains(ORDER_ID))); } @Test void givenOrderCreatedEventProductAddedEventAndOrderConfirmedEvent_whenDecrementProductCountCommand_thenShouldThrowOrderAlreadyConfirmedException() { - fixture.given(new OrderCreatedEvent(ORDER_ID), - new ProductAddedEvent(ORDER_ID, PRODUCT_ID), - new OrderConfirmedEvent(ORDER_ID)) - .when(new DecrementProductCountCommand(ORDER_ID, PRODUCT_ID)) - .expectException(OrderAlreadyConfirmedException.class) - .expectExceptionMessage(Matchers.predicate(message -> ((String) message).contains(ORDER_ID))); + fixture.given(new OrderCreatedEvent(ORDER_ID), new ProductAddedEvent(ORDER_ID, PRODUCT_ID), new OrderConfirmedEvent(ORDER_ID)) + .when(new DecrementProductCountCommand(ORDER_ID, PRODUCT_ID)) + .expectException(OrderAlreadyConfirmedException.class) + .expectExceptionMessage(Matchers.predicate(message -> ((String) message).contains(ORDER_ID))); } } \ No newline at end of file diff --git a/axon/src/test/java/com/baeldung/axon/querymodel/AbstractOrdersEventHandlerUnitTest.java b/axon/src/test/java/com/baeldung/axon/querymodel/AbstractOrdersEventHandlerUnitTest.java index 2396d0f10b..fb7833a3f6 100644 --- a/axon/src/test/java/com/baeldung/axon/querymodel/AbstractOrdersEventHandlerUnitTest.java +++ b/axon/src/test/java/com/baeldung/axon/querymodel/AbstractOrdersEventHandlerUnitTest.java @@ -12,8 +12,10 @@ import com.baeldung.axon.coreapi.queries.Order; import com.baeldung.axon.coreapi.queries.OrderStatus; import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery; import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery; + import org.axonframework.queryhandling.QueryUpdateEmitter; import org.junit.jupiter.api.*; + import reactor.core.publisher.Flux; import reactor.test.StepVerifier; @@ -27,10 +29,14 @@ import static org.mockito.Mockito.*; public abstract class AbstractOrdersEventHandlerUnitTest { - private static final String ORDER_ID_1 = UUID.randomUUID().toString(); - private static final String ORDER_ID_2 = UUID.randomUUID().toString(); - private static final String PRODUCT_ID_1 = UUID.randomUUID().toString(); - private static final String PRODUCT_ID_2 = UUID.randomUUID().toString(); + private static final String ORDER_ID_1 = UUID.randomUUID() + .toString(); + private static final String ORDER_ID_2 = UUID.randomUUID() + .toString(); + private static final String PRODUCT_ID_1 = UUID.randomUUID() + .toString(); + private static final String PRODUCT_ID_2 = UUID.randomUUID() + .toString(); private OrdersEventHandler handler; private static Order orderOne; private static Order orderTwo; @@ -39,12 +45,15 @@ public abstract class AbstractOrdersEventHandlerUnitTest { @BeforeAll static void createOrders() { orderOne = new Order(ORDER_ID_1); - orderOne.getProducts().put(PRODUCT_ID_1, 3); + orderOne.getProducts() + .put(PRODUCT_ID_1, 3); orderOne.setOrderShipped(); orderTwo = new Order(ORDER_ID_2); - orderTwo.getProducts().put(PRODUCT_ID_1, 1); - orderTwo.getProducts().put(PRODUCT_ID_2, 1); + orderTwo.getProducts() + .put(PRODUCT_ID_1, 1); + orderTwo.getProducts() + .put(PRODUCT_ID_2, 1); orderTwo.setOrderConfirmed(); } @@ -64,10 +73,18 @@ public abstract class AbstractOrdersEventHandlerUnitTest { assertNotNull(result); assertEquals(2, result.size()); - Order order_1 = result.stream().filter(o -> o.getOrderId().equals(ORDER_ID_1)).findFirst().orElse(null); + Order order_1 = result.stream() + .filter(o -> o.getOrderId() + .equals(ORDER_ID_1)) + .findFirst() + .orElse(null); assertEquals(orderOne, order_1); - Order order_2 = result.stream().filter(o -> o.getOrderId().equals(ORDER_ID_2)).findFirst().orElse(null); + Order order_2 = result.stream() + .filter(o -> o.getOrderId() + .equals(ORDER_ID_2)) + .findFirst() + .orElse(null); assertEquals(orderTwo, order_2); } @@ -75,9 +92,11 @@ public abstract class AbstractOrdersEventHandlerUnitTest { void givenTwoOrdersPlacedOfWhichOneNotShipped_whenFindAllOrderedProductsQueryStreaming_thenCorrectOrdersAreReturned() { resetWithTwoOrders(); final Consumer orderVerifier = order -> { - if (order.getOrderId().equals(orderOne.getOrderId())) { + if (order.getOrderId() + .equals(orderOne.getOrderId())) { assertEquals(orderOne, order); - } else if (order.getOrderId().equals(orderTwo.getOrderId())) { + } else if (order.getOrderId() + .equals(orderTwo.getOrderId())) { assertEquals(orderTwo, order); } else { throw new RuntimeException("Would expect either order one or order two"); @@ -85,10 +104,10 @@ public abstract class AbstractOrdersEventHandlerUnitTest { }; StepVerifier.create(Flux.from(handler.handleStreaming(new FindAllOrderedProductsQuery()))) - .assertNext(orderVerifier) - .assertNext(orderVerifier) - .expectComplete() - .verify(); + .assertNext(orderVerifier) + .assertNext(orderVerifier) + .expectComplete() + .verify(); } @Test @@ -120,7 +139,8 @@ public abstract class AbstractOrdersEventHandlerUnitTest { Order result = handler.handle(new OrderUpdatesQuery(ORDER_ID_1)); assertNotNull(result); assertEquals(ORDER_ID_1, result.getOrderId()); - assertEquals(3, result.getProducts().get(PRODUCT_ID_1)); + assertEquals(3, result.getProducts() + .get(PRODUCT_ID_1)); assertEquals(OrderStatus.SHIPPED, result.getOrderStatus()); } diff --git a/axon/src/test/java/com/baeldung/axon/querymodel/MongoOrdersEventHandlerUnitTest.java b/axon/src/test/java/com/baeldung/axon/querymodel/MongoOrdersEventHandlerUnitTest.java new file mode 100644 index 0000000000..c183294dc1 --- /dev/null +++ b/axon/src/test/java/com/baeldung/axon/querymodel/MongoOrdersEventHandlerUnitTest.java @@ -0,0 +1,20 @@ +package com.baeldung.axon.querymodel; + +import com.mongodb.client.MongoClient; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest; + +@DataMongoTest +public class MongoOrdersEventHandlerUnitTest extends AbstractOrdersEventHandlerUnitTest { + + @Autowired + MongoClient mongoClient; + + @Override + protected OrdersEventHandler getHandler() { + mongoClient.getDatabase("axonframework") + .drop(); + return new MongoOrdersEventHandler(mongoClient, emitter); + } +} diff --git a/axon/src/test/java/com/baeldung/axon/querymodel/OrderQueryServiceIntegrationTest.java b/axon/src/test/java/com/baeldung/axon/querymodel/OrderQueryServiceIntegrationTest.java index dfb4881fdc..4a89fe0e4e 100644 --- a/axon/src/test/java/com/baeldung/axon/querymodel/OrderQueryServiceIntegrationTest.java +++ b/axon/src/test/java/com/baeldung/axon/querymodel/OrderQueryServiceIntegrationTest.java @@ -6,7 +6,6 @@ import com.baeldung.axon.coreapi.events.OrderShippedEvent; import com.baeldung.axon.coreapi.events.ProductAddedEvent; import com.baeldung.axon.coreapi.events.ProductCountDecrementedEvent; import com.baeldung.axon.coreapi.events.ProductCountIncrementedEvent; -import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery; import com.baeldung.axon.coreapi.queries.Order; import org.axonframework.eventhandling.gateway.EventGateway; @@ -66,9 +65,9 @@ class OrderQueryServiceIntegrationTest { void givenOrderCreatedEventSend_whenCallingAllOrdersStreaming_thenOneOrderIsReturned() { Flux result = queryService.allOrdersStreaming(); StepVerifier.create(result) - .assertNext(order -> assertEquals(orderId, order.getOrderId())) - .expectComplete() - .verify(); + .assertNext(order -> assertEquals(orderId, order.getOrderId())) + .expectComplete() + .verify(); } @Test diff --git a/axon/src/test/resources/application.properties b/axon/src/test/resources/application.properties index 35b5452b57..c42b7a4e90 100644 --- a/axon/src/test/resources/application.properties +++ b/axon/src/test/resources/application.properties @@ -1 +1,2 @@ +spring.mongodb.embedded.version=5.0.6 axon.axonserver.enabled=false \ No newline at end of file diff --git a/axon/start_mongo.sh b/axon/start_mongo.sh new file mode 100755 index 0000000000..2cc5a3a7cf --- /dev/null +++ b/axon/start_mongo.sh @@ -0,0 +1,7 @@ +docker run \ + -d \ + --name order_projection \ + -p 27017:27017 \ + -e MONGO_INITDB_ROOT_USERNAME=admin1234 \ + -e MONGO_INITDB_ROOT_PASSWORD=somepassword \ + mongo \ No newline at end of file