Merge pull request #7884 from macieg/bael-3285

BAEL-3285 | Simultaneous Spring WebClient calls
This commit is contained in:
Jonathan Cook 2019-10-02 00:46:39 +02:00 committed by GitHub
commit e1a6b9f63b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 190 additions and 0 deletions

View File

@ -88,6 +88,12 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock</artifactId>
<version>2.24.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>

View File

@ -0,0 +1,63 @@
package com.baeldung.reactive.webclient.simultaneous;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.util.List;
public class Client {
private WebClient webClient;
public Client(String uri) {
this.webClient = WebClient.create(uri);
}
public Mono<User> getUser(int id) {
return webClient.get()
.uri("/user/{id}", id)
.retrieve()
.bodyToMono(User.class);
}
public Mono<Item> getItem(int id) {
return webClient.get()
.uri("/item/{id}", id)
.retrieve()
.bodyToMono(Item.class);
}
public Mono<User> getOtherUser(int id) {
return webClient.get()
.uri("/otheruser/{id}", id)
.retrieve()
.bodyToMono(User.class);
}
public List<User> fetchUsers(List<Integer> userIds) {
return Flux.fromIterable(userIds)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(this::getUser)
.collectSortedList((u1, u2) -> u2.id() - u1.id())
.block();
}
public List<User> fetchUserAndOtherUser(int id) {
return Flux.merge(getUser(id), getOtherUser(id))
.parallel()
.runOn(Schedulers.elastic())
.collectSortedList((u1, u2) -> u2.id() - u1.id())
.block();
}
public UserWithItem fetchUserAndItem(int userId, int itemId) {
Mono<User> user = getUser(userId).subscribeOn(Schedulers.elastic());
Mono<Item> item = getItem(itemId).subscribeOn(Schedulers.elastic());
return Mono.zip(user, item, UserWithItem::new)
.block();
}
}

View File

@ -0,0 +1,17 @@
package com.baeldung.reactive.webclient.simultaneous;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class Item {
private int id;
@JsonCreator
public Item(@JsonProperty("id") int id) {
this.id = id;
}
public int id() {
return id;
}
}

View File

@ -0,0 +1,17 @@
package com.baeldung.reactive.webclient.simultaneous;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class User {
private int id;
@JsonCreator
public User(@JsonProperty("id") int id) {
this.id = id;
}
public int id() {
return id;
}
}

View File

@ -0,0 +1,19 @@
package com.baeldung.reactive.webclient.simultaneous;
public class UserWithItem {
private User user;
private Item item;
public UserWithItem(User user, Item item) {
this.user = user;
this.item = item;
}
public User user() {
return user;
}
public Item item() {
return item;
}
}

View File

@ -0,0 +1,68 @@
package com.baeldung.reactive.webclient.simultaneous;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.github.tomakehurst.wiremock.WireMockServer;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ClientIntegrationTest {
private WireMockServer wireMockServer;
@Before
public void setup() {
wireMockServer = new WireMockServer(wireMockConfig().port(8089));
wireMockServer.start();
configureFor("localhost", wireMockServer.port());
}
@After
public void tearDown() {
wireMockServer.stop();
}
@Test
public void checkIfCallsAreExecutedSimultaneously() {
// Arrange
int requestsNumber = 5;
int singleRequestTime = 1000;
for (int i = 1; i <= requestsNumber; i++) {
stubFor(get(urlEqualTo("/user/" + i)).willReturn(aResponse().withFixedDelay(singleRequestTime)
.withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody(String.format("{ \"id\": %d }", i))));
}
List<Integer> userIds = IntStream.rangeClosed(1, requestsNumber)
.boxed()
.collect(Collectors.toList());
Client client = new Client("http://localhost:8089");
// Act
long start = System.currentTimeMillis();
List<User> users = client.fetchUsers(userIds);
long end = System.currentTimeMillis();
// Assert
long totalExecutionTime = end - start;
assertEquals("Unexpected number of users", requestsNumber, users.size());
assertTrue("Execution time is too big", 2 * singleRequestTime > totalExecutionTime);
}
}