Merge pull request #14392 from ukhan1980/BAEL-6689-use-zip-when-with-mono
[BAEL-6689] code for zipwhen
This commit is contained in:
commit
ec90772372
|
@ -0,0 +1,22 @@
|
|||
package com.baeldung.webflux.zipwhen.model;
|
||||
|
||||
public class User {
|
||||
|
||||
private final String id;
|
||||
private final String email;
|
||||
|
||||
public User(String id, String email) {
|
||||
this.id = id;
|
||||
this.email = email;
|
||||
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public String getEmail() {
|
||||
return email;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package com.baeldung.webflux.zipwhen.service;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import com.baeldung.webflux.zipwhen.model.User;
|
||||
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class DatabaseService {
|
||||
private Map<String, User> dataStore = new ConcurrentHashMap<>();
|
||||
|
||||
public Mono<Boolean> saveUserData(User user) {
|
||||
return Mono.create(sink -> {
|
||||
try {
|
||||
dataStore.put(user.getId(), user);
|
||||
sink.success(true);
|
||||
} catch (Exception e) {
|
||||
sink.success(false);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
package com.baeldung.webflux.zipwhen.service;
|
||||
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class EmailService {
|
||||
private final UserService userService;
|
||||
|
||||
public EmailService(UserService userService) {
|
||||
this.userService = userService;
|
||||
}
|
||||
|
||||
public Mono<Boolean> sendEmail(String userId) {
|
||||
return userService.getUser(userId)
|
||||
.flatMap(user -> {
|
||||
System.out.println("Sending email to: " + user.getEmail());
|
||||
return Mono.just(true);
|
||||
})
|
||||
.defaultIfEmpty(false);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
package com.baeldung.webflux.zipwhen.service;
|
||||
|
||||
import com.baeldung.webflux.zipwhen.model.User;
|
||||
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class UserService {
|
||||
public Mono<User> getUser(String userId) {
|
||||
return Mono.just(new User(userId, "john Major"));
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
package com.baeldung.webflux.zipwhen.web;
|
||||
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
|
||||
import com.baeldung.webflux.zipwhen.model.User;
|
||||
import com.baeldung.webflux.zipwhen.service.DatabaseService;
|
||||
import com.baeldung.webflux.zipwhen.service.EmailService;
|
||||
import com.baeldung.webflux.zipwhen.service.UserService;
|
||||
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
public class UserController {
|
||||
private final UserService userService;
|
||||
private final EmailService emailService;
|
||||
private final DatabaseService databaseService;
|
||||
|
||||
public UserController(UserService userService, EmailService emailService, DatabaseService databaseService) {
|
||||
this.userService = userService;
|
||||
this.emailService = emailService;
|
||||
this.databaseService = databaseService;
|
||||
}
|
||||
|
||||
@GetMapping("/example/{userId}")
|
||||
public Mono<ResponseEntity<String>> combineAllDataFor(@PathVariable String userId) {
|
||||
Mono<User> userMono = userService.getUser(userId);
|
||||
Mono<Boolean> emailSentMono = emailService.sendEmail(userId)
|
||||
.subscribeOn(Schedulers.parallel());
|
||||
Mono<String> databaseResultMono = userMono.flatMap(user -> databaseService.saveUserData(user)
|
||||
.map(Object::toString));
|
||||
|
||||
return userMono.zipWhen(user -> emailSentMono, Tuples::of)
|
||||
.zipWhen(tuple -> databaseResultMono, (tuple, databaseResult) -> {
|
||||
User user = tuple.getT1();
|
||||
Boolean emailSent = tuple.getT2();
|
||||
return ResponseEntity.ok()
|
||||
.body("Response: " + user + ", Email Sent: " + emailSent + ", Database Result: " + databaseResult);
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
package com.baeldung.webflux.zipwhen;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
|
||||
import com.baeldung.webflux.zipwhen.model.User;
|
||||
import com.baeldung.webflux.zipwhen.service.DatabaseService;
|
||||
import com.baeldung.webflux.zipwhen.service.EmailService;
|
||||
import com.baeldung.webflux.zipwhen.service.UserService;
|
||||
import com.baeldung.webflux.zipwhen.web.UserController;
|
||||
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
public class UserControllerUnitTest {
|
||||
@Test
|
||||
public void givenUserId_whenCombineAllData_thenReturnsMonoWithCombinedData() {
|
||||
UserService userService = Mockito.mock(UserService.class);
|
||||
EmailService emailService = Mockito.mock(EmailService.class);
|
||||
DatabaseService databaseService = Mockito.mock(DatabaseService.class);
|
||||
|
||||
String userId = "123";
|
||||
User user = new User(userId, "John Doe");
|
||||
|
||||
Mockito.when(userService.getUser(userId))
|
||||
.thenReturn(Mono.just(user));
|
||||
Mockito.when(emailService.sendEmail(userId))
|
||||
.thenReturn(Mono.just(true));
|
||||
Mockito.when(databaseService.saveUserData(user))
|
||||
.thenReturn(Mono.just(true));
|
||||
|
||||
UserController userController = new UserController(userService, emailService, databaseService);
|
||||
|
||||
Mono<ResponseEntity<String>> responseMono = userController.combineAllDataFor(userId);
|
||||
|
||||
StepVerifier.create(responseMono)
|
||||
.expectNextMatches(responseEntity -> responseEntity.getStatusCode() == HttpStatus.OK && responseEntity.getBody()
|
||||
.equals("Response: " + user + ", Email Sent: true, Database Result: " + true))
|
||||
.verifyComplete();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue