[BAEL-2278] spring-5-reactive | debugging reactive streams (#5789)

* First approach for the scenario, not stable yet.

* Added 2 services for teh article
Consumer is the main example project, with all the debugging functionality

* * cleaned unused spring-5-data functionality

*  cleaning unused spring-5-data pom

* *fixed indentation error

* addressed Jira comments:
* Created live test, and renamed unit test as integrtion test
* Tried to fix issue in CI, couldnt reproduce locally
This commit is contained in:
Ger Roza 2018-11-27 17:32:34 -02:00 committed by maibin
parent 71bca76bf8
commit 3d2f6eff67
18 changed files with 679 additions and 2 deletions

View File

@ -0,0 +1,33 @@
package com.baeldung.debugging.consumer;
import java.util.Collections;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;
import reactor.core.publisher.Hooks;
@SpringBootApplication(exclude = MongoReactiveAutoConfiguration.class)
@EnableScheduling
public class ConsumerSSEApplication {
public static void main(String[] args) {
Hooks.onOperatorDebug();
SpringApplication app = new SpringApplication(ConsumerSSEApplication.class);
app.setDefaultProperties(Collections.singletonMap("server.port", "8082"));
app.run(args);
}
@Bean
public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
http.authorizeExchange()
.anyExchange()
.permitAll();
return http.build();
}
}

View File

@ -0,0 +1,122 @@
package com.baeldung.debugging.consumer.chronjobs;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import com.baeldung.debugging.consumer.model.Foo;
import com.baeldung.debugging.consumer.model.FooDto;
import com.baeldung.debugging.consumer.service.FooService;
import reactor.core.publisher.Flux;
@Component
public class ChronJobs {
private static Logger logger = LoggerFactory.getLogger(ChronJobs.class);
private WebClient client = WebClient.create("http://localhost:8081");
@Autowired
private FooService service;
@Scheduled(fixedRate = 10000)
public void consumeInfiniteFlux() {
Flux<Foo> fluxFoo = client.get()
.uri("/functional-reactive/periodic-foo")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(FooDto.class)
.delayElements(Duration.ofMillis(100))
.map(dto -> {
logger.debug("process 1 with dto id {} name{}", dto.getId(), dto.getName());
return new Foo(dto);
});
Integer random = ThreadLocalRandom.current()
.nextInt(0, 3);
switch (random) {
case 0:
logger.info("process 1 with approach 1");
service.processFoo(fluxFoo);
break;
case 1:
logger.info("process 1 with approach 1 EH");
service.processUsingApproachOneWithErrorHandling(fluxFoo);
break;
default:
logger.info("process 1 with approach 2");
service.processFooInAnotherScenario(fluxFoo);
break;
}
}
@Scheduled(fixedRate = 20000)
public void consumeFiniteFlux2() {
Flux<Foo> fluxFoo = client.get()
.uri("/functional-reactive/periodic-foo-2")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(FooDto.class)
.delayElements(Duration.ofMillis(100))
.map(dto -> {
logger.debug("process 2 with dto id {} name{}", dto.getId(), dto.getName());
return new Foo(dto);
});
Integer random = ThreadLocalRandom.current()
.nextInt(0, 3);
switch (random) {
case 0:
logger.info("process 2 with approach 1");
service.processFoo(fluxFoo);
break;
case 1:
logger.info("process 2 with approach 1 EH");
service.processUsingApproachOneWithErrorHandling(fluxFoo);
break;
default:
logger.info("process 2 with approach 2");
service.processFooInAnotherScenario(fluxFoo);
break;
}
}
@Scheduled(fixedRate = 20000)
public void consumeFiniteFlux3() {
Flux<Foo> fluxFoo = client.get()
.uri("/functional-reactive/periodic-foo-2")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(FooDto.class)
.delayElements(Duration.ofMillis(100))
.map(dto -> {
logger.debug("process 3 with dto id {} name{}", dto.getId(), dto.getName());
return new Foo(dto);
});
logger.info("process 3 with approach 3");
service.processUsingApproachThree(fluxFoo);
}
@Scheduled(fixedRate = 20000)
public void consumeFiniteFluxWithCheckpoint4() {
Flux<Foo> fluxFoo = client.get()
.uri("/functional-reactive/periodic-foo-2")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(FooDto.class)
.delayElements(Duration.ofMillis(100))
.map(dto -> {
logger.debug("process 4 with dto id {} name{}", dto.getId(), dto.getName());
return new Foo(dto);
});
logger.info("process 4 with approach 4");
service.processUsingApproachFourWithCheckpoint(fluxFoo);
}
}

View File

@ -0,0 +1,23 @@
package com.baeldung.debugging.consumer.controllers;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Hooks;
@RestController
public class ReactiveConfigsToggleRestController {
@GetMapping("/debug-hook-on")
public String setReactiveDebugOn() {
Hooks.onOperatorDebug();
return "DEBUG HOOK ON";
}
@GetMapping("/debug-hook-off")
public String setReactiveDebugOff() {
Hooks.resetOnOperatorDebug();
return "DEBUG HOOK OFF";
}
}

View File

@ -0,0 +1,26 @@
package com.baeldung.debugging.consumer.model;
import java.util.concurrent.ThreadLocalRandom;
import org.springframework.data.annotation.Id;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class Foo {
@Id
private Integer id;
private String formattedName;
private Integer quantity;
public Foo(FooDto dto) {
this.id = (ThreadLocalRandom.current()
.nextInt(0, 100) == 0) ? null : dto.getId();
this.formattedName = dto.getName();
this.quantity = ThreadLocalRandom.current()
.nextInt(0, 10);
}
}

View File

@ -0,0 +1,12 @@
package com.baeldung.debugging.consumer.model;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class FooDto {
private Integer id;
private String name;
}

View File

@ -0,0 +1,45 @@
package com.baeldung.debugging.consumer.service;
import java.util.concurrent.ThreadLocalRandom;
import com.baeldung.debugging.consumer.model.Foo;
import reactor.core.publisher.Flux;
public class FooNameHelper {
public static Flux<Foo> concatAndSubstringFooName(Flux<Foo> flux) {
flux = concatFooName(flux);
flux = substringFooName(flux);
return flux;
}
public static Flux<Foo> concatFooName(Flux<Foo> flux) {
flux = flux.map(foo -> {
String processedName = null;
Integer random = ThreadLocalRandom.current()
.nextInt(0, 80);
processedName = (random != 0) ? foo.getFormattedName() : foo.getFormattedName() + "-bael";
foo.setFormattedName(processedName);
return foo;
});
return flux;
}
public static Flux<Foo> substringFooName(Flux<Foo> flux) {
return flux.map(foo -> {
String processedName;
Integer random = ThreadLocalRandom.current()
.nextInt(0, 100);
processedName = (random == 0) ? foo.getFormattedName()
.substring(10, 15)
: foo.getFormattedName()
.substring(0, 5);
foo.setFormattedName(processedName);
return foo;
});
}
}

View File

@ -0,0 +1,31 @@
package com.baeldung.debugging.consumer.service;
import java.util.concurrent.ThreadLocalRandom;
import com.baeldung.debugging.consumer.model.Foo;
import reactor.core.publisher.Flux;
public class FooQuantityHelper {
public static Flux<Foo> processFooReducingQuantity(Flux<Foo> flux) {
flux = flux.map(foo -> {
Integer result;
Integer random = ThreadLocalRandom.current()
.nextInt(0, 90);
result = (random == 0) ? result = 0 : foo.getQuantity() + 2;
foo.setQuantity(result);
return foo;
});
return divideFooQuantity(flux);
}
public static Flux<Foo> divideFooQuantity(Flux<Foo> flux) {
return flux.map(foo -> {
Integer result = Math.round(5 / foo.getQuantity());
foo.setQuantity(result);
return foo;
});
}
}

View File

@ -0,0 +1,26 @@
package com.baeldung.debugging.consumer.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.baeldung.debugging.consumer.model.Foo;
import reactor.core.publisher.Flux;
public class FooReporter {
private static Logger logger = LoggerFactory.getLogger(FooReporter.class);
public static Flux<Foo> reportResult(Flux<Foo> input, String approach) {
return input.map(foo -> {
if (foo.getId() == null)
throw new IllegalArgumentException("Null id is not valid!");
logger.info("Reporting for approach {}: Foo with id '{}' name '{}' and quantity '{}'", approach, foo.getId(), foo.getFormattedName(), foo.getQuantity());
return foo;
});
}
public static Flux<Foo> reportResult(Flux<Foo> input) {
return reportResult(input, "default");
}
}

View File

@ -0,0 +1,91 @@
package com.baeldung.debugging.consumer.service;
import static com.baeldung.debugging.consumer.service.FooNameHelper.concatAndSubstringFooName;
import static com.baeldung.debugging.consumer.service.FooNameHelper.substringFooName;
import static com.baeldung.debugging.consumer.service.FooQuantityHelper.divideFooQuantity;
import static com.baeldung.debugging.consumer.service.FooQuantityHelper.processFooReducingQuantity;
import static com.baeldung.debugging.consumer.service.FooReporter.reportResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.baeldung.debugging.consumer.model.Foo;
import reactor.core.publisher.Flux;
@Component
public class FooService {
private static Logger logger = LoggerFactory.getLogger(FooService.class);
public void processFoo(Flux<Foo> flux) {
flux = FooNameHelper.concatFooName(flux);
flux = FooNameHelper.substringFooName(flux);
flux = flux.log();
flux = FooReporter.reportResult(flux);
flux = flux.doOnError(error -> {
logger.error("The following error happened on processFoo method!", error);
});
flux.subscribe();
}
public void processFooInAnotherScenario(Flux<Foo> flux) {
flux = FooNameHelper.substringFooName(flux);
flux = FooQuantityHelper.divideFooQuantity(flux);
flux.subscribe();
}
public void processUsingApproachOneWithErrorHandling(Flux<Foo> flux) {
logger.info("starting approach one w error handling!");
flux = concatAndSubstringFooName(flux);
flux = concatAndSubstringFooName(flux);
flux = substringFooName(flux);
flux = processFooReducingQuantity(flux);
flux = processFooReducingQuantity(flux);
flux = processFooReducingQuantity(flux);
flux = reportResult(flux, "ONE w/ EH");
flux = flux.doOnError(error -> {
logger.error("Approach 1 with Error Handling failed!", error);
});
flux.subscribe();
}
public void processUsingApproachThree(Flux<Foo> flux) {
logger.info("starting approach three!");
flux = concatAndSubstringFooName(flux);
flux = reportResult(flux, "THREE");
flux = flux.doOnError(error -> {
logger.error("Approach 3 failed!", error);
});
flux.subscribe();
}
public void processUsingApproachFourWithCheckpoint(Flux<Foo> flux) {
logger.info("starting approach four!");
flux = concatAndSubstringFooName(flux);
flux = flux.checkpoint("CHECKPOINT 1");
flux = concatAndSubstringFooName(flux);
flux = divideFooQuantity(flux);
flux = flux.checkpoint("CHECKPOINT 2", true);
flux = reportResult(flux, "FOUR");
flux = concatAndSubstringFooName(flux).doOnError(error -> {
logger.error("Approach 4 failed!", error);
});
flux.subscribe();
}
public void processUsingApproachFourWithInitialCheckpoint(Flux<Foo> flux) {
logger.info("starting approach four!");
flux = concatAndSubstringFooName(flux);
flux = flux.checkpoint("CHECKPOINT 1", true);
flux = concatAndSubstringFooName(flux);
flux = divideFooQuantity(flux);
flux = reportResult(flux, "FOUR");
flux = flux.doOnError(error -> {
logger.error("Approach 4-2 failed!", error);
});
flux.subscribe();
}
}

View File

@ -0,0 +1,29 @@
package com.baeldung.debugging.server;
import java.util.Collections;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;
import org.springframework.web.reactive.config.EnableWebFlux;
@EnableWebFlux
@SpringBootApplication
public class ServerSSEApplication {
public static void main(String[] args) {
SpringApplication app = new SpringApplication(ServerSSEApplication.class);
app.setDefaultProperties(Collections.singletonMap("server.port", "8081"));
app.run(args);
}
@Bean
public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
http.authorizeExchange()
.anyExchange()
.permitAll();
return http.build();
}
}

View File

@ -0,0 +1,47 @@
package com.baeldung.debugging.server.handlers;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import com.baeldung.debugging.server.model.Foo;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Component
public class ServerHandler {
private static Logger logger = LoggerFactory.getLogger(ServerHandler.class);
public Mono<ServerResponse> useHandler(final ServerRequest request) {
// there are chances that something goes wrong here...
return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(Flux.interval(Duration.ofSeconds(1))
.map(sequence -> {
logger.info("retrieving Foo. Sequence: {}", sequence);
if (ThreadLocalRandom.current()
.nextInt(0, 50) == 1) {
throw new RuntimeException("There was an error retrieving the Foo!");
}
return new Foo(sequence, "name" + sequence);
}), Foo.class);
}
public Mono<ServerResponse> useHandlerFinite(final ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(Flux.range(0, 50)
.map(sequence -> {
return new Foo(new Long(sequence), "theFooNameNumber" + sequence);
}), Foo.class);
}
}

View File

@ -0,0 +1,16 @@
package com.baeldung.debugging.server.model;
import org.springframework.data.annotation.Id;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class Foo {
@Id
private Long id;
private String name;
}

View File

@ -0,0 +1,22 @@
package com.baeldung.debugging.server.routers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import com.baeldung.debugging.server.handlers.ServerHandler;
@Configuration
public class ServerRouter {
@Bean
public RouterFunction<ServerResponse> responseRoute(@Autowired ServerHandler handler) {
return RouterFunctions.route(RequestPredicates.GET("/functional-reactive/periodic-foo"), handler::useHandler)
.andRoute(RequestPredicates.GET("/functional-reactive/periodic-foo-2"), handler::useHandlerFinite);
}
}

View File

@ -1,2 +1 @@
logging.level.root=INFO
logging.level.root=INFO

View File

@ -0,0 +1,65 @@
package com.baeldung.debugging.consumer;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import com.baeldung.debugging.consumer.model.Foo;
import com.baeldung.debugging.consumer.service.FooService;
import com.baeldung.debugging.consumer.utils.ListAppender;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.IThrowableProxy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
public class ConsumerFooServiceIntegrationTest {
FooService service = new FooService();
@BeforeEach
public void clearLogList() {
Hooks.onOperatorDebug();
ListAppender.clearEventList();
}
@Test
public void givenFooWithNullId_whenProcessFoo_thenLogsWithDebugTrace() {
Foo one = new Foo(1, "nameverylong", 8);
Foo two = new Foo(null, "nameverylong", 4);
Flux<Foo> flux = Flux.just(one, two);
service.processFoo(flux);
Collection<String> allLoggedEntries = ListAppender.getEvents()
.stream()
.map(ILoggingEvent::getFormattedMessage)
.collect(Collectors.toList());
Collection<String> allSuppressedEntries = ListAppender.getEvents()
.stream()
.map(ILoggingEvent::getThrowableProxy)
.flatMap(t -> {
return Optional.ofNullable(t)
.map(IThrowableProxy::getSuppressed)
.map(Arrays::stream)
.orElse(Stream.empty());
})
.map(IThrowableProxy::getMessage)
.collect(Collectors.toList());
assertThat(allLoggedEntries).anyMatch(entry -> entry.contains("The following error happened on processFoo method!"))
.anyMatch(entry -> entry.contains("| onSubscribe"))
.anyMatch(entry -> entry.contains("| cancel()"));
assertThat(allSuppressedEntries).anyMatch(entry -> entry.contains("Assembly trace from producer"))
.anyMatch(entry -> entry.contains("Error has been observed by the following operator(s)"));
}
}

View File

@ -0,0 +1,49 @@
package com.baeldung.debugging.consumer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.test.web.reactive.server.WebTestClient.ResponseSpec;
import com.baeldung.debugging.consumer.service.FooService;
public class ConsumerFooServiceLiveTest {
FooService service = new FooService();
private static final String BASE_URL = "http://localhost:8082";
private static final String DEBUG_HOOK_ON = BASE_URL + "/debug-hook-on";
private static final String DEBUG_HOOK_OFF = BASE_URL + "/debug-hook-off";
private static WebTestClient client;
@BeforeAll
public static void setup() {
client = WebTestClient.bindToServer()
.baseUrl(BASE_URL)
.build();
}
@Test
public void whenRequestingDebugHookOn_thenObtainExpectedMessage() {
ResponseSpec response = client.get()
.uri(DEBUG_HOOK_ON)
.exchange();
response.expectStatus()
.isOk()
.expectBody(String.class)
.isEqualTo("DEBUG HOOK ON");
}
@Test
public void whenRequestingDebugHookOff_thenObtainExpectedMessage() {
ResponseSpec response = client.get()
.uri(DEBUG_HOOK_OFF)
.exchange();
response.expectStatus()
.isOk()
.expectBody(String.class)
.isEqualTo("DEBUG HOOK OFF");
}
}

View File

@ -0,0 +1,25 @@
package com.baeldung.debugging.consumer.utils;
import java.util.ArrayList;
import java.util.List;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
public class ListAppender extends AppenderBase<ILoggingEvent> {
static private List<ILoggingEvent> events = new ArrayList<>();
@Override
protected void append(ILoggingEvent eventObject) {
events.add(eventObject);
}
public static List<ILoggingEvent> getEvents() {
return events;
}
public static void clearEventList() {
events.clear();
}
}

View File

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include
resource="org/springframework/boot/logging/logback/base.xml" />
<appender name="LISTAPPENDER"
class="com.baeldung.debugging.consumer.utils.ListAppender">
</appender>
<logger
name="com.baeldung.debugging.consumer.service.FooService">
<appender-ref ref="LISTAPPENDER" />
</logger>
<root level="info">
<appender-ref ref="CONSOLE" />
<appender-ref ref="LISTAPPENDER" />
</root>
</configuration>