BAEL-5807 Add streaming query (#12769)

This commit is contained in:
Gerard Klijs 2022-10-02 06:17:55 +02:00 committed by GitHub
parent 45d3ad42d1
commit 8c8bb9a0e1
8 changed files with 61 additions and 1 deletions

View File

@ -71,7 +71,7 @@
</dependencies> </dependencies>
<properties> <properties>
<axon-bom.version>4.5.17</axon-bom.version> <axon-bom.version>4.6.0</axon-bom.version>
</properties> </properties>
</project> </project>

View File

@ -92,6 +92,11 @@ public class OrderRestEndpoint {
return orderQueryService.findAllOrders(); return orderQueryService.findAllOrders();
} }
@GetMapping(path = "/all-orders-streaming", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<OrderResponse> allOrdersStreaming() {
return orderQueryService.allOrdersStreaming();
}
@GetMapping("/total-shipped/{product-id}") @GetMapping("/total-shipped/{product-id}")
public Integer totalShipped(@PathVariable("product-id") String productId) { public Integer totalShipped(@PathVariable("product-id") String productId) {
return orderQueryService.totalShipped(productId); return orderQueryService.totalShipped(productId);

View File

@ -17,7 +17,10 @@ import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler; import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler; import org.axonframework.queryhandling.QueryHandler;
import org.axonframework.queryhandling.QueryUpdateEmitter; import org.axonframework.queryhandling.QueryUpdateEmitter;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -101,6 +104,11 @@ public class InMemoryOrdersEventHandler implements OrdersEventHandler {
return new ArrayList<>(orders.values()); return new ArrayList<>(orders.values());
} }
@QueryHandler
public Publisher<Order> handleStreaming(FindAllOrderedProductsQuery query) {
return Mono.fromCallable(orders::values).flatMapMany(Flux::fromIterable);
}
@QueryHandler @QueryHandler
public Integer handle(TotalProductsShippedQuery query) { public Integer handle(TotalProductsShippedQuery query) {
return orders.values() return orders.values()

View File

@ -9,6 +9,7 @@ import org.axonframework.messaging.responsetypes.ResponseType;
import org.axonframework.messaging.responsetypes.ResponseTypes; import org.axonframework.messaging.responsetypes.ResponseTypes;
import org.axonframework.queryhandling.QueryGateway; import org.axonframework.queryhandling.QueryGateway;
import org.axonframework.queryhandling.SubscriptionQueryResult; import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@ -34,6 +35,11 @@ public class OrderQueryService {
.collect(Collectors.toList())); .collect(Collectors.toList()));
} }
public Flux<OrderResponse> allOrdersStreaming() {
Publisher<Order> publisher = queryGateway.streamingQuery(new FindAllOrderedProductsQuery(), Order.class);
return Flux.from(publisher).map(OrderResponse::new);
}
public Integer totalShipped(String productId) { public Integer totalShipped(String productId) {
return queryGateway.scatterGather(new TotalProductsShippedQuery(productId), return queryGateway.scatterGather(new TotalProductsShippedQuery(productId),
ResponseTypes.instanceOf(Integer.class), 10L, TimeUnit.SECONDS) ResponseTypes.instanceOf(Integer.class), 10L, TimeUnit.SECONDS)

View File

@ -11,6 +11,7 @@ import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery;
import com.baeldung.axon.coreapi.queries.Order; import com.baeldung.axon.coreapi.queries.Order;
import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery; import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery;
import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery; import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery;
import org.reactivestreams.Publisher;
import java.util.List; import java.util.List;
@ -32,6 +33,8 @@ public interface OrdersEventHandler {
List<Order> handle(FindAllOrderedProductsQuery query); List<Order> handle(FindAllOrderedProductsQuery query);
Publisher<Order> handleStreaming(FindAllOrderedProductsQuery query);
Integer handle(TotalProductsShippedQuery query); Integer handle(TotalProductsShippedQuery query);
Order handle(OrderUpdatesQuery query); Order handle(OrderUpdatesQuery query);

View File

@ -10,6 +10,10 @@ POST http://localhost:8080/ship-unconfirmed-order
GET http://localhost:8080/all-orders GET http://localhost:8080/all-orders
### Receive all existing orders using a stream
GET http://localhost:8080/all-orders-streaming
### Create Order with id 666a1661-474d-4046-8b12-8b5896312768 ### Create Order with id 666a1661-474d-4046-8b12-8b5896312768
POST http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768 POST http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768

View File

@ -14,10 +14,13 @@ import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery;
import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery; import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery;
import org.axonframework.queryhandling.QueryUpdateEmitter; import org.axonframework.queryhandling.QueryUpdateEmitter;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
@ -68,6 +71,26 @@ public abstract class AbstractOrdersEventHandlerUnitTest {
assertEquals(orderTwo, order_2); assertEquals(orderTwo, order_2);
} }
@Test
void givenTwoOrdersPlacedOfWhichOneNotShipped_whenFindAllOrderedProductsQueryStreaming_thenCorrectOrdersAreReturned() {
resetWithTwoOrders();
final Consumer<Order> orderVerifier = order -> {
if (order.getOrderId().equals(orderOne.getOrderId())) {
assertEquals(orderOne, order);
} else if (order.getOrderId().equals(orderTwo.getOrderId())) {
assertEquals(orderTwo, order);
} else {
throw new RuntimeException("Would expect either order one or order two");
}
};
StepVerifier.create(Flux.from(handler.handleStreaming(new FindAllOrderedProductsQuery())))
.assertNext(orderVerifier)
.assertNext(orderVerifier)
.expectComplete()
.verify();
}
@Test @Test
void givenNoOrdersPlaced_whenTotalProductsShippedQuery_thenZeroReturned() { void givenNoOrdersPlaced_whenTotalProductsShippedQuery_thenZeroReturned() {
assertEquals(0, handler.handle(new TotalProductsShippedQuery(PRODUCT_ID_1))); assertEquals(0, handler.handle(new TotalProductsShippedQuery(PRODUCT_ID_1)));

View File

@ -6,6 +6,7 @@ import com.baeldung.axon.coreapi.events.OrderShippedEvent;
import com.baeldung.axon.coreapi.events.ProductAddedEvent; import com.baeldung.axon.coreapi.events.ProductAddedEvent;
import com.baeldung.axon.coreapi.events.ProductCountDecrementedEvent; import com.baeldung.axon.coreapi.events.ProductCountDecrementedEvent;
import com.baeldung.axon.coreapi.events.ProductCountIncrementedEvent; import com.baeldung.axon.coreapi.events.ProductCountIncrementedEvent;
import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery;
import com.baeldung.axon.coreapi.queries.Order; import com.baeldung.axon.coreapi.queries.Order;
import org.axonframework.eventhandling.gateway.EventGateway; import org.axonframework.eventhandling.gateway.EventGateway;
@ -13,6 +14,7 @@ import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
import java.util.Collections; import java.util.Collections;
@ -60,6 +62,15 @@ class OrderQueryServiceIntegrationTest {
.isEmpty()); .isEmpty());
} }
@Test
void givenOrderCreatedEventSend_whenCallingAllOrdersStreaming_thenOneOrderIsReturned() {
Flux<OrderResponse> result = queryService.allOrdersStreaming();
StepVerifier.create(result)
.assertNext(order -> assertEquals(orderId, order.getOrderId()))
.expectComplete()
.verify();
}
@Test @Test
void givenThreeDeluxeChairsShipped_whenCallingAllShippedChairs_then234PlusTreeIsReturned() { void givenThreeDeluxeChairsShipped_whenCallingAllShippedChairs_then234PlusTreeIsReturned() {
Order order = new Order(orderId); Order order = new Order(orderId);