Merge pull request #12001 from hkhan/JAVA-8147-update-debugging-reactive-streams

[JAVA-8147] Update and cleanup code for debugging reactive streams
This commit is contained in:
kwoyke 2022-04-04 10:05:13 +02:00 committed by GitHub
commit d28d3f54ce
11 changed files with 227 additions and 239 deletions

View File

@ -1,4 +1,4 @@
package com.baeldung.reactive.debugging.consumer.chronjobs; package com.baeldung.reactive.debugging.consumer.cronjobs;
import com.baeldung.reactive.debugging.consumer.model.Foo; import com.baeldung.reactive.debugging.consumer.model.Foo;
import com.baeldung.reactive.debugging.consumer.model.FooDto; import com.baeldung.reactive.debugging.consumer.model.FooDto;
@ -16,10 +16,10 @@ import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
@Component @Component
public class ChronJobs { public class CronJobs {
private static Logger logger = LoggerFactory.getLogger(ChronJobs.class); final Logger logger = LoggerFactory.getLogger(CronJobs.class);
private WebClient client = WebClient.create("http://localhost:8081"); final WebClient client = WebClient.create("http://localhost:8081");
@Autowired @Autowired
private FooService service; private FooService service;
@ -27,17 +27,19 @@ public class ChronJobs {
@Scheduled(fixedRate = 10000) @Scheduled(fixedRate = 10000)
public void consumeInfiniteFlux() { public void consumeInfiniteFlux() {
Flux<Foo> fluxFoo = client.get() Flux<Foo> fluxFoo = client.get()
.uri("/functional-reactive/periodic-foo") .uri("/functional-reactive/periodic-foo")
.accept(MediaType.TEXT_EVENT_STREAM) .accept(MediaType.TEXT_EVENT_STREAM)
.retrieve() .retrieve()
.bodyToFlux(FooDto.class) .bodyToFlux(FooDto.class)
.delayElements(Duration.ofMillis(100)) .delayElements(Duration.ofMillis(100))
.map(dto -> { .map(dto -> {
logger.debug("process 1 with dto id {} name{}", dto.getId(), dto.getName()); logger.debug("process 1 with dto id {} name{}", dto.getId(), dto.getName());
return new Foo(dto); return new Foo(dto);
}); });
Integer random = ThreadLocalRandom.current()
.nextInt(0, 3); int random = ThreadLocalRandom.current()
.nextInt(0, 3);
switch (random) { switch (random) {
case 0: case 0:
logger.info("process 1 with approach 1"); logger.info("process 1 with approach 1");
@ -51,24 +53,25 @@ public class ChronJobs {
logger.info("process 1 with approach 2"); logger.info("process 1 with approach 2");
service.processFooInAnotherScenario(fluxFoo); service.processFooInAnotherScenario(fluxFoo);
break; break;
} }
} }
@Scheduled(fixedRate = 20000) @Scheduled(fixedRate = 20000)
public void consumeFiniteFlux2() { public void consumeFiniteFlux2() {
Flux<Foo> fluxFoo = client.get() Flux<Foo> fluxFoo = client.get()
.uri("/functional-reactive/periodic-foo-2") .uri("/functional-reactive/periodic-foo-2")
.accept(MediaType.TEXT_EVENT_STREAM) .accept(MediaType.TEXT_EVENT_STREAM)
.retrieve() .retrieve()
.bodyToFlux(FooDto.class) .bodyToFlux(FooDto.class)
.delayElements(Duration.ofMillis(100)) .delayElements(Duration.ofMillis(100))
.map(dto -> { .map(dto -> {
logger.debug("process 2 with dto id {} name{}", dto.getId(), dto.getName()); logger.debug("process 2 with dto id {} name{}", dto.getId(), dto.getName());
return new Foo(dto); return new Foo(dto);
}); });
Integer random = ThreadLocalRandom.current()
.nextInt(0, 3); int random = ThreadLocalRandom.current()
.nextInt(0, 3);
switch (random) { switch (random) {
case 0: case 0:
logger.info("process 2 with approach 1"); logger.info("process 2 with approach 1");
@ -82,22 +85,21 @@ public class ChronJobs {
logger.info("process 2 with approach 2"); logger.info("process 2 with approach 2");
service.processFooInAnotherScenario(fluxFoo); service.processFooInAnotherScenario(fluxFoo);
break; break;
} }
} }
@Scheduled(fixedRate = 20000) @Scheduled(fixedRate = 20000)
public void consumeFiniteFlux3() { public void consumeFiniteFlux3() {
Flux<Foo> fluxFoo = client.get() Flux<Foo> fluxFoo = client.get()
.uri("/functional-reactive/periodic-foo-2") .uri("/functional-reactive/periodic-foo-2")
.accept(MediaType.TEXT_EVENT_STREAM) .accept(MediaType.TEXT_EVENT_STREAM)
.retrieve() .retrieve()
.bodyToFlux(FooDto.class) .bodyToFlux(FooDto.class)
.delayElements(Duration.ofMillis(100)) .delayElements(Duration.ofMillis(100))
.map(dto -> { .map(dto -> {
logger.debug("process 3 with dto id {} name{}", dto.getId(), dto.getName()); logger.debug("process 3 with dto id {} name{}", dto.getId(), dto.getName());
return new Foo(dto); return new Foo(dto);
}); });
logger.info("process 3 with approach 3"); logger.info("process 3 with approach 3");
service.processUsingApproachThree(fluxFoo); service.processUsingApproachThree(fluxFoo);
} }
@ -105,15 +107,15 @@ public class ChronJobs {
@Scheduled(fixedRate = 20000) @Scheduled(fixedRate = 20000)
public void consumeFiniteFluxWithCheckpoint4() { public void consumeFiniteFluxWithCheckpoint4() {
Flux<Foo> fluxFoo = client.get() Flux<Foo> fluxFoo = client.get()
.uri("/functional-reactive/periodic-foo-2") .uri("/functional-reactive/periodic-foo-2")
.accept(MediaType.TEXT_EVENT_STREAM) .accept(MediaType.TEXT_EVENT_STREAM)
.retrieve() .retrieve()
.bodyToFlux(FooDto.class) .bodyToFlux(FooDto.class)
.delayElements(Duration.ofMillis(100)) .delayElements(Duration.ofMillis(100))
.map(dto -> { .map(dto -> {
logger.debug("process 4 with dto id {} name{}", dto.getId(), dto.getName()); logger.debug("process 4 with dto id {} name{}", dto.getId(), dto.getName());
return new Foo(dto); return new Foo(dto);
}); });
logger.info("process 4 with approach 4"); logger.info("process 4 with approach 4");
service.processUsingApproachFourWithCheckpoint(fluxFoo); service.processUsingApproachFourWithCheckpoint(fluxFoo);
} }
@ -121,15 +123,15 @@ public class ChronJobs {
@Scheduled(fixedRate = 20000) @Scheduled(fixedRate = 20000)
public void consumeFiniteFluxWitParallelScheduler() { public void consumeFiniteFluxWitParallelScheduler() {
Flux<Foo> fluxFoo = client.get() Flux<Foo> fluxFoo = client.get()
.uri("/functional-reactive/periodic-foo-2") .uri("/functional-reactive/periodic-foo-2")
.accept(MediaType.TEXT_EVENT_STREAM) .accept(MediaType.TEXT_EVENT_STREAM)
.retrieve() .retrieve()
.bodyToFlux(FooDto.class) .bodyToFlux(FooDto.class)
.delayElements(Duration.ofMillis(100)) .delayElements(Duration.ofMillis(100))
.map(dto -> { .map(dto -> {
logger.debug("process 5-parallel with dto id {} name{}", dto.getId(), dto.getName()); logger.debug("process 5-parallel with dto id {} name{}", dto.getId(), dto.getName());
return new Foo(dto); return new Foo(dto);
}); });
logger.info("process 5-parallel with approach 5-parallel"); logger.info("process 5-parallel with approach 5-parallel");
service.processUsingApproachFivePublishingToDifferentParallelThreads(fluxFoo); service.processUsingApproachFivePublishingToDifferentParallelThreads(fluxFoo);
} }
@ -137,15 +139,15 @@ public class ChronJobs {
@Scheduled(fixedRate = 20000) @Scheduled(fixedRate = 20000)
public void consumeFiniteFluxWithSingleSchedulers() { public void consumeFiniteFluxWithSingleSchedulers() {
Flux<Foo> fluxFoo = client.get() Flux<Foo> fluxFoo = client.get()
.uri("/functional-reactive/periodic-foo-2") .uri("/functional-reactive/periodic-foo-2")
.accept(MediaType.TEXT_EVENT_STREAM) .accept(MediaType.TEXT_EVENT_STREAM)
.retrieve() .retrieve()
.bodyToFlux(FooDto.class) .bodyToFlux(FooDto.class)
.delayElements(Duration.ofMillis(100)) .delayElements(Duration.ofMillis(100))
.map(dto -> { .map(dto -> {
logger.debug("process 5-single with dto id {} name{}", dto.getId(), dto.getName()); logger.debug("process 5-single with dto id {} name{}", dto.getId(), dto.getName());
return new Foo(dto); return new Foo(dto);
}); });
logger.info("process 5-single with approach 5-single"); logger.info("process 5-single with approach 5-single");
service.processUsingApproachFivePublishingToDifferentSingleThreads(fluxFoo); service.processUsingApproachFivePublishingToDifferentSingleThreads(fluxFoo);
} }

View File

@ -1,14 +1,12 @@
package com.baeldung.reactive.debugging.consumer.model; package com.baeldung.reactive.debugging.consumer.model;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.Setter;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
@Getter @Data
@Setter
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
public class Foo { public class Foo {
@ -18,10 +16,16 @@ public class Foo {
private Integer quantity; private Integer quantity;
public Foo(FooDto dto) { public Foo(FooDto dto) {
this.id = (ThreadLocalRandom.current() this.id = randomId() == 0 ? null : dto.getId();
.nextInt(0, 100) == 0) ? null : dto.getId();
this.formattedName = dto.getName(); this.formattedName = dto.getName();
this.quantity = ThreadLocalRandom.current() this.quantity = randomQuantity();
.nextInt(0, 10); }
private static int randomId() {
return ThreadLocalRandom.current().nextInt(0, 100);
}
private static int randomQuantity() {
return ThreadLocalRandom.current().nextInt(0, 10);
} }
} }

View File

@ -1,16 +1,15 @@
package com.baeldung.reactive.debugging.consumer.model; package com.baeldung.reactive.debugging.consumer.model;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter @Data
@Setter
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
public class FooDto { public class FooDto {
private Integer id; private Integer id;
private String name; private String name;
} }

View File

@ -1,44 +1,40 @@
package com.baeldung.reactive.debugging.consumer.service; package com.baeldung.reactive.debugging.consumer.service;
import com.baeldung.reactive.debugging.consumer.model.Foo; import com.baeldung.reactive.debugging.consumer.model.Foo;
import reactor.core.publisher.Flux;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
public class FooNameHelper { public class FooNameHelper {
public static Flux<Foo> concatAndSubstringFooName(Flux<Foo> flux) { public static Foo concatAndSubstringFooName(Foo foo) {
flux = concatFooName(flux); Foo concat = concatFooName(foo);
flux = substringFooName(flux); return substringFooName(concat);
return flux;
} }
public static Flux<Foo> concatFooName(Flux<Foo> flux) { public static Foo concatFooName(Foo foo) {
flux = flux.map(foo -> {
String processedName = null; int random = ThreadLocalRandom.current()
Integer random = ThreadLocalRandom.current() .nextInt(0, 80);
.nextInt(0, 80);
processedName = (random != 0) ? foo.getFormattedName() : foo.getFormattedName() + "-bael"; String processedName = (random != 0)
foo.setFormattedName(processedName); ? foo.getFormattedName()
return foo; : foo.getFormattedName() + "-bael";
});
return flux; foo.setFormattedName(processedName);
return foo;
} }
public static Flux<Foo> substringFooName(Flux<Foo> flux) { public static Foo substringFooName(Foo foo) {
return flux.map(foo -> { int random = ThreadLocalRandom.current()
String processedName; .nextInt(0, 100);
Integer random = ThreadLocalRandom.current()
.nextInt(0, 100);
processedName = (random == 0) ? foo.getFormattedName() String processedName = (random == 0)
.substring(10, 15) ? foo.getFormattedName().substring(10, 15)
: foo.getFormattedName() : foo.getFormattedName().substring(0, 5);
.substring(0, 5);
foo.setFormattedName(processedName); foo.setFormattedName(processedName);
return foo;
}); return foo;
} }
} }

View File

@ -1,30 +1,24 @@
package com.baeldung.reactive.debugging.consumer.service; package com.baeldung.reactive.debugging.consumer.service;
import com.baeldung.reactive.debugging.consumer.model.Foo; import com.baeldung.reactive.debugging.consumer.model.Foo;
import reactor.core.publisher.Flux;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
public class FooQuantityHelper { public class FooQuantityHelper {
public static Flux<Foo> processFooReducingQuantity(Flux<Foo> flux) { public static Foo processFooReducingQuantity(Foo foo) {
flux = flux.map(foo -> { int random = ThreadLocalRandom.current().nextInt(0, 90);
Integer result; int result = (random == 0) ? 0 : foo.getQuantity() + 2;
Integer random = ThreadLocalRandom.current() foo.setQuantity(result);
.nextInt(0, 90);
result = (random == 0) ? result = 0 : foo.getQuantity() + 2; return divideFooQuantity(foo);
foo.setQuantity(result);
return foo;
});
return divideFooQuantity(flux);
} }
public static Flux<Foo> divideFooQuantity(Flux<Foo> flux) { public static Foo divideFooQuantity(Foo foo) {
return flux.map(foo -> {
Integer result = Math.round(5 / foo.getQuantity()); Integer result = (int) Math.round(5.0 / foo.getQuantity());
foo.setQuantity(result); foo.setQuantity(result);
return foo; return foo;
});
} }
} }

View File

@ -3,22 +3,22 @@ package com.baeldung.reactive.debugging.consumer.service;
import com.baeldung.reactive.debugging.consumer.model.Foo; import com.baeldung.reactive.debugging.consumer.model.Foo;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
public class FooReporter { public class FooReporter {
private static Logger logger = LoggerFactory.getLogger(FooReporter.class); private static final Logger LOGGER = LoggerFactory.getLogger(FooReporter.class);
public static Flux<Foo> reportResult(Flux<Foo> input, String approach) { public static Foo reportResult(Foo foo, String approach) {
return input.map(foo -> { if (foo.getId() == null) {
if (foo.getId() == null) throw new IllegalArgumentException("Null id is not valid!");
throw new IllegalArgumentException("Null id is not valid!"); }
logger.info("Reporting for approach {}: Foo with id '{}' name '{}' and quantity '{}'", approach, foo.getId(), foo.getFormattedName(), foo.getQuantity()); LOGGER.info("Reporting for approach {}: Foo with id '{}' name '{}' and quantity '{}'",
return foo; approach, foo.getId(), foo.getFormattedName(), foo.getQuantity());
});
return foo;
} }
public static Flux<Foo> reportResult(Flux<Foo> input) { public static Foo reportResult(Foo input) {
return reportResult(input, "default"); return reportResult(input, "default");
} }
} }

View File

@ -7,112 +7,107 @@ import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import static com.baeldung.reactive.debugging.consumer.service.FooNameHelper.concatAndSubstringFooName;
import static com.baeldung.reactive.debugging.consumer.service.FooNameHelper.substringFooName;
import static com.baeldung.reactive.debugging.consumer.service.FooQuantityHelper.divideFooQuantity;
import static com.baeldung.reactive.debugging.consumer.service.FooQuantityHelper.processFooReducingQuantity;
import static com.baeldung.reactive.debugging.consumer.service.FooReporter.reportResult; import static com.baeldung.reactive.debugging.consumer.service.FooReporter.reportResult;
@Component @Component
public class FooService { public class FooService {
private static Logger logger = LoggerFactory.getLogger(FooService.class); private static final Logger LOGGER = LoggerFactory.getLogger(FooService.class);
public void processFoo(Flux<Foo> flux) { public void processFoo(Flux<Foo> flux) {
flux = FooNameHelper.concatFooName(flux); flux.map(FooNameHelper::concatFooName)
flux = FooNameHelper.substringFooName(flux); .map(FooNameHelper::substringFooName)
flux = flux.log(); .log()
flux = FooReporter.reportResult(flux); .map(FooReporter::reportResult)
flux = flux.doOnError(error -> { .doOnError(error -> LOGGER.error("The following error happened on processFoo method!", error))
logger.error("The following error happened on processFoo method!", error); .subscribe();
});
flux.subscribe();
} }
public void processFooInAnotherScenario(Flux<Foo> flux) { public void processFooInAnotherScenario(Flux<Foo> flux) {
flux = FooNameHelper.substringFooName(flux); flux.map(FooNameHelper::substringFooName)
flux = FooQuantityHelper.divideFooQuantity(flux); .map(FooQuantityHelper::divideFooQuantity)
flux.subscribe(); .subscribe();
} }
public void processUsingApproachOneWithErrorHandling(Flux<Foo> flux) { public void processUsingApproachOneWithErrorHandling(Flux<Foo> flux) {
logger.info("starting approach one w error handling!"); LOGGER.info("starting approach one w error handling!");
flux = concatAndSubstringFooName(flux);
flux = concatAndSubstringFooName(flux); flux.map(FooNameHelper::concatAndSubstringFooName)
flux = substringFooName(flux); .map(FooNameHelper::concatAndSubstringFooName)
flux = processFooReducingQuantity(flux); .map(FooNameHelper::substringFooName)
flux = processFooReducingQuantity(flux); .map(FooQuantityHelper::processFooReducingQuantity)
flux = processFooReducingQuantity(flux); .map(FooQuantityHelper::processFooReducingQuantity)
flux = reportResult(flux, "ONE w/ EH"); .map(FooQuantityHelper::processFooReducingQuantity)
flux = flux.doOnError(error -> { .map(FooReporter::reportResult)
logger.error("Approach 1 with Error Handling failed!", error); .doOnError(error -> LOGGER.error("Approach 1 with Error Handling failed!", error))
}); .subscribe();
flux.subscribe();
} }
public void processUsingApproachThree(Flux<Foo> flux) { public void processUsingApproachThree(Flux<Foo> flux) {
logger.info("starting approach three!"); LOGGER.info("starting approach three!");
flux = concatAndSubstringFooName(flux);
flux = reportResult(flux, "THREE"); flux.map(FooNameHelper::concatAndSubstringFooName)
flux = flux.doOnError(error -> { .map(foo -> reportResult(foo, "THREE"))
logger.error("Approach 3 failed!", error); .doOnError(error -> LOGGER.error("Approach 3 failed!", error))
}); .subscribe();
flux.subscribe();
} }
public void processUsingApproachFourWithCheckpoint(Flux<Foo> flux) { public void processUsingApproachFourWithCheckpoint(Flux<Foo> flux) {
logger.info("starting approach four!"); LOGGER.info("starting approach four!");
flux = concatAndSubstringFooName(flux);
flux = flux.checkpoint("CHECKPOINT 1"); flux.map(FooNameHelper::concatAndSubstringFooName)
flux = concatAndSubstringFooName(flux); .checkpoint("CHECKPOINT 1")
flux = divideFooQuantity(flux); .map(FooNameHelper::concatAndSubstringFooName)
flux = flux.checkpoint("CHECKPOINT 2", true); .map(FooQuantityHelper::divideFooQuantity)
flux = reportResult(flux, "FOUR"); .checkpoint("CHECKPOINT 2", true)
flux = concatAndSubstringFooName(flux).doOnError(error -> { .map(foo -> reportResult(foo, "FOUR"))
logger.error("Approach 4 failed!", error); .map(FooNameHelper::concatAndSubstringFooName)
}); .doOnError(error -> LOGGER.error("Approach 4 failed!", error))
flux.subscribe(); .subscribe();
} }
public void processUsingApproachFourWithInitialCheckpoint(Flux<Foo> flux) { public void processUsingApproachFourWithInitialCheckpoint(Flux<Foo> flux) {
logger.info("starting approach four!"); LOGGER.info("starting approach four!");
flux = concatAndSubstringFooName(flux);
flux = flux.checkpoint("CHECKPOINT 1", true); flux.map(FooNameHelper::concatAndSubstringFooName)
flux = concatAndSubstringFooName(flux); .checkpoint("CHECKPOINT 1", true)
flux = divideFooQuantity(flux); .map(FooNameHelper::concatAndSubstringFooName)
flux = reportResult(flux, "FOUR"); .map(FooQuantityHelper::divideFooQuantity)
flux = flux.doOnError(error -> { .map(foo -> reportResult(foo, "FOUR"))
logger.error("Approach 4-2 failed!", error); .map(FooNameHelper::concatAndSubstringFooName)
}); .doOnError(error -> LOGGER.error("Approach 4-2 failed!", error))
flux.subscribe(); .subscribe();
} }
public void processUsingApproachFivePublishingToDifferentParallelThreads(Flux<Foo> flux) { public void processUsingApproachFivePublishingToDifferentParallelThreads(Flux<Foo> flux) {
logger.info("starting approach five-parallel!"); LOGGER.info("starting approach five-parallel!");
flux = concatAndSubstringFooName(flux).publishOn(Schedulers.newParallel("five-parallel-foo"))
.log(); flux.map(FooNameHelper::concatAndSubstringFooName)
flux = concatAndSubstringFooName(flux); .publishOn(Schedulers.newParallel("five-parallel-foo"))
flux = divideFooQuantity(flux); .log()
flux = reportResult(flux, "FIVE-PARALLEL").publishOn(Schedulers.newSingle("five-parallel-bar")); .map(FooNameHelper::concatAndSubstringFooName)
flux = concatAndSubstringFooName(flux).doOnError(error -> { .map(foo -> reportResult(foo, "FIVE-PARALLEL"))
logger.error("Approach 5-parallel failed!", error); .publishOn(Schedulers.newSingle("five-parallel-bar"))
}); .map(FooNameHelper::concatAndSubstringFooName)
flux.subscribeOn(Schedulers.newParallel("five-parallel-starter")) .doOnError(error -> LOGGER.error("Approach 5-parallel failed!", error))
.subscribe(); .subscribeOn(Schedulers.newParallel("five-parallel-starter"))
.subscribe();
} }
public void processUsingApproachFivePublishingToDifferentSingleThreads(Flux<Foo> flux) { public void processUsingApproachFivePublishingToDifferentSingleThreads(Flux<Foo> flux) {
logger.info("starting approach five-single!"); LOGGER.info("starting approach five-single!");
flux = flux.log()
.subscribeOn(Schedulers.newSingle("five-single-starter")); flux.log()
flux = concatAndSubstringFooName(flux).publishOn(Schedulers.newSingle("five-single-foo")); .subscribeOn(Schedulers.newSingle("five-single-starter"))
flux = concatAndSubstringFooName(flux); .map(FooNameHelper::concatAndSubstringFooName)
flux = divideFooQuantity(flux); .publishOn(Schedulers.newSingle("five-single-foo"))
flux = reportResult(flux, "FIVE-SINGLE").publishOn(Schedulers.newSingle("five-single-bar")); .map(FooNameHelper::concatAndSubstringFooName)
flux = concatAndSubstringFooName(flux).doOnError(error -> { .map(FooQuantityHelper::divideFooQuantity)
logger.error("Approach 5-single failed!", error); .map(foo -> reportResult(foo, "FIVE-SINGLE"))
}); .publishOn(Schedulers.newSingle("five-single-bar"))
flux.subscribe(); .map(FooNameHelper::concatAndSubstringFooName)
.doOnError(error -> LOGGER.error("Approach 5-single failed!", error))
.subscribe();
} }
} }

View File

@ -16,30 +16,31 @@ import java.util.concurrent.ThreadLocalRandom;
@Component @Component
public class ServerHandler { public class ServerHandler {
private static Logger logger = LoggerFactory.getLogger(ServerHandler.class); private static final Logger LOGGER = LoggerFactory.getLogger(ServerHandler.class);
public Mono<ServerResponse> useHandler(final ServerRequest request) { public Mono<ServerResponse> useHandler(final ServerRequest request) {
// there are chances that something goes wrong here... // there are chances that something goes wrong here...
return ServerResponse.ok() return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM) .contentType(MediaType.TEXT_EVENT_STREAM)
.body(Flux.interval(Duration.ofSeconds(1)) .body(getFlux(), Foo.class);
.map(sequence -> {
logger.info("retrieving Foo. Sequence: {}", sequence);
if (ThreadLocalRandom.current()
.nextInt(0, 50) == 1) {
throw new RuntimeException("There was an error retrieving the Foo!");
}
return new Foo(sequence, "name" + sequence);
}), Foo.class);
} }
public Mono<ServerResponse> useHandlerFinite(final ServerRequest request) { public Mono<ServerResponse> useHandlerFinite(final ServerRequest request) {
return ServerResponse.ok() return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM) .contentType(MediaType.TEXT_EVENT_STREAM)
.body(Flux.range(0, 50) .body(Flux.range(0, 50)
.map(sequence -> { .map(sequence -> new Foo(new Long(sequence), "theFooNameNumber" + sequence)
return new Foo(new Long(sequence), "theFooNameNumber" + sequence); ), Foo.class);
}), Foo.class); }
private static Flux<Foo> getFlux() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> {
LOGGER.info("retrieving Foo. Sequence: {}", sequence);
if (ThreadLocalRandom.current().nextInt(0, 50) == 1) {
throw new RuntimeException("There was an error retrieving the Foo!");
}
return new Foo(sequence, "name" + sequence);
});
} }
} }

View File

@ -45,19 +45,19 @@ public class ConsumerFooServiceIntegrationTest {
Collection<String> allSuppressedEntries = ListAppender.getEvents() Collection<String> allSuppressedEntries = ListAppender.getEvents()
.stream() .stream()
.map(ILoggingEvent::getThrowableProxy) .map(ILoggingEvent::getThrowableProxy)
.flatMap(t -> { .flatMap(t -> Optional.ofNullable(t)
return Optional.ofNullable(t) .map(IThrowableProxy::getSuppressed)
.map(IThrowableProxy::getSuppressed) .map(Arrays::stream)
.map(Arrays::stream) .orElse(Stream.empty()))
.orElse(Stream.empty());
})
.map(IThrowableProxy::getClassName) .map(IThrowableProxy::getClassName)
.collect(Collectors.toList()); .collect(Collectors.toList());
assertThat(allLoggedEntries).anyMatch(entry -> entry.contains("The following error happened on processFoo method!"))
.anyMatch(entry -> entry.contains("| onSubscribe")) assertThat(allLoggedEntries)
.anyMatch(entry -> entry.contains("| cancel()")); .anyMatch(entry -> entry.contains("The following error happened on processFoo method!"))
.anyMatch(entry -> entry.contains("| onSubscribe"))
.anyMatch(entry -> entry.contains("| cancel()"));
assertThat(allSuppressedEntries) assertThat(allSuppressedEntries)
.anyMatch(entry -> entry.contains("reactor.core.publisher.FluxOnAssembly$OnAssemblyException")); .anyMatch(entry -> entry.contains("reactor.core.publisher.FluxOnAssembly$OnAssemblyException"));
} }
} }

View File

@ -1,6 +1,5 @@
package com.baeldung.reactive.debugging.consumer; package com.baeldung.reactive.debugging.consumer;
import com.baeldung.reactive.debugging.consumer.service.FooService;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.test.web.reactive.server.WebTestClient; import org.springframework.test.web.reactive.server.WebTestClient;
@ -13,8 +12,6 @@ import org.springframework.test.web.reactive.server.WebTestClient.ResponseSpec;
*/ */
public class ConsumerFooServiceLiveTest { public class ConsumerFooServiceLiveTest {
FooService service = new FooService();
private static final String BASE_URL = "http://localhost:8082"; private static final String BASE_URL = "http://localhost:8082";
private static final String DEBUG_HOOK_ON = BASE_URL + "/debug-hook-on"; private static final String DEBUG_HOOK_ON = BASE_URL + "/debug-hook-on";
private static final String DEBUG_HOOK_OFF = BASE_URL + "/debug-hook-off"; private static final String DEBUG_HOOK_OFF = BASE_URL + "/debug-hook-off";

View File

@ -8,18 +8,18 @@ import java.util.List;
public class ListAppender extends AppenderBase<ILoggingEvent> { public class ListAppender extends AppenderBase<ILoggingEvent> {
static private List<ILoggingEvent> events = new ArrayList<>(); private static final List<ILoggingEvent> EVENTS = new ArrayList<>();
@Override @Override
protected void append(ILoggingEvent eventObject) { protected void append(ILoggingEvent eventObject) {
events.add(eventObject); EVENTS.add(eventObject);
} }
public static List<ILoggingEvent> getEvents() { public static List<ILoggingEvent> getEvents() {
return events; return EVENTS;
} }
public static void clearEventList() { public static void clearEventList() {
events.clear(); EVENTS.clear();
} }
} }