diff --git a/spring-5-reactive-client/pom.xml b/spring-5-reactive-client/pom.xml index 9e574b2196..994239fa9b 100644 --- a/spring-5-reactive-client/pom.xml +++ b/spring-5-reactive-client/pom.xml @@ -88,6 +88,12 @@ spring-boot-starter-test test + + com.github.tomakehurst + wiremock + 2.24.1 + test + org.apache.commons diff --git a/spring-5-reactive-client/src/main/java/com/baeldung/reactive/webclient/simultaneous/Client.java b/spring-5-reactive-client/src/main/java/com/baeldung/reactive/webclient/simultaneous/Client.java new file mode 100644 index 0000000000..3c6623cb02 --- /dev/null +++ b/spring-5-reactive-client/src/main/java/com/baeldung/reactive/webclient/simultaneous/Client.java @@ -0,0 +1,63 @@ +package com.baeldung.reactive.webclient.simultaneous; + +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; + +import java.util.List; + +public class Client { + + private WebClient webClient; + + public Client(String uri) { + this.webClient = WebClient.create(uri); + } + + public Mono getUser(int id) { + return webClient.get() + .uri("/user/{id}", id) + .retrieve() + .bodyToMono(User.class); + } + + public Mono getItem(int id) { + return webClient.get() + .uri("/item/{id}", id) + .retrieve() + .bodyToMono(Item.class); + } + + public Mono getOtherUser(int id) { + return webClient.get() + .uri("/otheruser/{id}", id) + .retrieve() + .bodyToMono(User.class); + } + + public List fetchUsers(List userIds) { + return Flux.fromIterable(userIds) + .parallel() + .runOn(Schedulers.elastic()) + .flatMap(this::getUser) + .collectSortedList((u1, u2) -> u2.id() - u1.id()) + .block(); + } + + public List fetchUserAndOtherUser(int id) { + return Flux.merge(getUser(id), getOtherUser(id)) + .parallel() + .runOn(Schedulers.elastic()) + .collectSortedList((u1, u2) -> u2.id() - u1.id()) + .block(); + } + + public UserWithItem fetchUserAndItem(int userId, int itemId) { + Mono user = getUser(userId).subscribeOn(Schedulers.elastic()); + Mono item = getItem(itemId).subscribeOn(Schedulers.elastic()); + + return Mono.zip(user, item, UserWithItem::new) + .block(); + } +} diff --git a/spring-5-reactive-client/src/main/java/com/baeldung/reactive/webclient/simultaneous/Item.java b/spring-5-reactive-client/src/main/java/com/baeldung/reactive/webclient/simultaneous/Item.java new file mode 100644 index 0000000000..5b8260743b --- /dev/null +++ b/spring-5-reactive-client/src/main/java/com/baeldung/reactive/webclient/simultaneous/Item.java @@ -0,0 +1,17 @@ +package com.baeldung.reactive.webclient.simultaneous; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class Item { + private int id; + + @JsonCreator + public Item(@JsonProperty("id") int id) { + this.id = id; + } + + public int id() { + return id; + } +} diff --git a/spring-5-reactive-client/src/main/java/com/baeldung/reactive/webclient/simultaneous/User.java b/spring-5-reactive-client/src/main/java/com/baeldung/reactive/webclient/simultaneous/User.java new file mode 100644 index 0000000000..0e1cc2cd76 --- /dev/null +++ b/spring-5-reactive-client/src/main/java/com/baeldung/reactive/webclient/simultaneous/User.java @@ -0,0 +1,17 @@ +package com.baeldung.reactive.webclient.simultaneous; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class User { + private int id; + + @JsonCreator + public User(@JsonProperty("id") int id) { + this.id = id; + } + + public int id() { + return id; + } +} diff --git a/spring-5-reactive-client/src/main/java/com/baeldung/reactive/webclient/simultaneous/UserWithItem.java b/spring-5-reactive-client/src/main/java/com/baeldung/reactive/webclient/simultaneous/UserWithItem.java new file mode 100644 index 0000000000..96dcfe994e --- /dev/null +++ b/spring-5-reactive-client/src/main/java/com/baeldung/reactive/webclient/simultaneous/UserWithItem.java @@ -0,0 +1,19 @@ +package com.baeldung.reactive.webclient.simultaneous; + +public class UserWithItem { + private User user; + private Item item; + + public UserWithItem(User user, Item item) { + this.user = user; + this.item = item; + } + + public User user() { + return user; + } + + public Item item() { + return item; + } +} diff --git a/spring-5-reactive-client/src/test/java/com/baeldung/reactive/webclient/simultaneous/ClientIntegrationTest.java b/spring-5-reactive-client/src/test/java/com/baeldung/reactive/webclient/simultaneous/ClientIntegrationTest.java new file mode 100644 index 0000000000..99efd34f9f --- /dev/null +++ b/spring-5-reactive-client/src/test/java/com/baeldung/reactive/webclient/simultaneous/ClientIntegrationTest.java @@ -0,0 +1,68 @@ +package com.baeldung.reactive.webclient.simultaneous; + +import org.junit.Test; +import org.junit.Before; +import org.junit.After; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; +import com.github.tomakehurst.wiremock.WireMockServer; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; + +@RunWith(SpringRunner.class) +@SpringBootTest +public class ClientIntegrationTest { + + private WireMockServer wireMockServer; + + @Before + public void setup() { + wireMockServer = new WireMockServer(wireMockConfig().port(8089)); + wireMockServer.start(); + configureFor("localhost", wireMockServer.port()); + } + + @After + public void tearDown() { + wireMockServer.stop(); + } + + @Test + public void checkIfCallsAreExecutedSimultaneously() { + // Arrange + int requestsNumber = 5; + int singleRequestTime = 1000; + + for (int i = 1; i <= requestsNumber; i++) { + stubFor(get(urlEqualTo("/user/" + i)).willReturn(aResponse().withFixedDelay(singleRequestTime) + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody(String.format("{ \"id\": %d }", i)))); + } + + List userIds = IntStream.rangeClosed(1, requestsNumber) + .boxed() + .collect(Collectors.toList()); + + Client client = new Client("http://localhost:8089"); + + // Act + long start = System.currentTimeMillis(); + List users = client.fetchUsers(userIds); + long end = System.currentTimeMillis(); + + // Assert + long totalExecutionTime = end - start; + + assertEquals("Unexpected number of users", requestsNumber, users.size()); + assertTrue("Execution time is too big", 2 * singleRequestTime > totalExecutionTime); + } +}