diff --git a/pom.xml b/pom.xml index 3770c05b49..1560e99d98 100644 --- a/pom.xml +++ b/pom.xml @@ -557,7 +557,6 @@ atomikos reactive-systems slack - spring-webflux-threads @@ -1037,7 +1036,6 @@ atomikos reactive-systems slack - spring-webflux-threads diff --git a/reactor-core/README.md b/reactor-core/README.md index a4ced49096..7ca3b5773f 100644 --- a/reactor-core/README.md +++ b/reactor-core/README.md @@ -4,7 +4,6 @@ This module contains articles about Reactor Core. ### Relevant articles -- [Intro To Reactor Core](https://www.baeldung.com/reactor-core) - [Combining Publishers in Project Reactor](https://www.baeldung.com/reactor-combine-streams) - [Programmatically Creating Sequences with Project Reactor](https://www.baeldung.com/flux-sequences-reactor) - [How to Extract a Mono’s Content in Java](https://www.baeldung.com/java-string-from-mono) diff --git a/reactor-core/src/test/java/com/baeldung/reactor/introduction/ReactorIntegrationTest.java b/reactor-core/src/test/java/com/baeldung/reactor/introduction/ReactorIntegrationTest.java deleted file mode 100644 index a1acffac91..0000000000 --- a/reactor-core/src/test/java/com/baeldung/reactor/introduction/ReactorIntegrationTest.java +++ /dev/null @@ -1,119 +0,0 @@ -package com.baeldung.reactor.introduction; - -import org.junit.Test; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import reactor.core.publisher.ConnectableFlux; -import reactor.core.publisher.Flux; -import reactor.core.scheduler.Schedulers; - -import java.util.ArrayList; -import java.util.List; - -import static org.assertj.core.api.Assertions.assertThat; - -public class ReactorIntegrationTest { - - @Test - public void givenFlux_whenSubscribing_thenStream() { - - List elements = new ArrayList<>(); - - Flux.just(1, 2, 3, 4) - .log() - .map(i -> { - System.out.println(i + ":" + Thread.currentThread()); - return i * 2; - }) - .subscribe(elements::add); - - assertThat(elements).containsExactly(2, 4, 6, 8); - } - - @Test - public void givenFlux_whenZipping_thenCombine() { - List elements = new ArrayList<>(); - - Flux.just(1, 2, 3, 4) - .log() - .map(i -> i * 2) - .zipWith(Flux.range(0, Integer.MAX_VALUE).log(), (one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two)) - .subscribe(elements::add); - - assertThat(elements).containsExactly( - "First Flux: 2, Second Flux: 0", - "First Flux: 4, Second Flux: 1", - "First Flux: 6, Second Flux: 2", - "First Flux: 8, Second Flux: 3"); - } - - @Test - public void givenFlux_whenApplyingBackPressure_thenPushElementsInBatches() { - - List elements = new ArrayList<>(); - - Flux.just(1, 2, 3, 4) - .log() - .subscribe(new Subscriber() { - private Subscription s; - int onNextAmount; - - @Override - public void onSubscribe(final Subscription s) { - this.s = s; - s.request(2); - } - - @Override - public void onNext(final Integer integer) { - elements.add(integer); - onNextAmount++; - if (onNextAmount % 2 == 0) { - s.request(2); - } - } - - @Override - public void onError(final Throwable t) { - } - - @Override - public void onComplete() { - } - }); - - assertThat(elements).containsExactly(1, 2, 3, 4); - } - - @Test - public void givenFlux_whenInParallel_thenSubscribeInDifferentThreads() throws InterruptedException { - List threadNames = new ArrayList<>(); - - Flux.just(1, 2, 3, 4) - .log() - .map(i -> Thread.currentThread().getName()) - .subscribeOn(Schedulers.parallel()) - .subscribe(threadNames::add); - - Thread.sleep(1000); - - assertThat(threadNames).containsExactly("parallel-1", "parallel-1", "parallel-1", "parallel-1"); - } - - @Test - public void givenConnectableFlux_whenConnected_thenShouldStream() { - - List elements = new ArrayList<>(); - - final ConnectableFlux publish = Flux.just(1, 2, 3, 4).publish(); - - publish.subscribe(elements::add); - - assertThat(elements).isEmpty(); - - publish.connect(); - - assertThat(elements).containsExactly(1, 2, 3, 4); - } - -} diff --git a/spring-5-reactive-2/README.md b/spring-5-reactive-2/README.md index 98a5f26433..39752c0cf1 100644 --- a/spring-5-reactive-2/README.md +++ b/spring-5-reactive-2/README.md @@ -2,11 +2,9 @@ This module contains articles about reactive Spring 5 -- [Spring WebClient vs. RestTemplate](https://www.baeldung.com/spring-webclient-resttemplate) - [Validation for Functional Endpoints in Spring 5](https://www.baeldung.com/spring-functional-endpoints-validation) - [Logging a Reactive Sequence](https://www.baeldung.com/spring-reactive-sequence-logging) - [Testing Reactive Streams Using StepVerifier and TestPublisher](https://www.baeldung.com/reactive-streams-step-verifier-test-publisher) -- [Debugging Reactive Streams in Java](https://www.baeldung.com/spring-debugging-reactive-streams) - [Static Content in Spring WebFlux](https://www.baeldung.com/spring-webflux-static-content) - [Server-Sent Events in Spring](https://www.baeldung.com/spring-server-sent-events) - [Backpressure Mechanism in Spring WebFlux](https://www.baeldung.com/spring-webflux-backpressure) diff --git a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/ConsumerDebuggingApplication.java b/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/ConsumerDebuggingApplication.java deleted file mode 100644 index 3f01310006..0000000000 --- a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/ConsumerDebuggingApplication.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.baeldung.debugging.consumer; - -import java.util.Collections; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.security.config.web.server.ServerHttpSecurity; -import org.springframework.security.web.server.SecurityWebFilterChain; - -import reactor.core.publisher.Hooks; - -@SpringBootApplication(exclude = MongoReactiveAutoConfiguration.class) -@EnableScheduling -public class ConsumerDebuggingApplication { - - public static void main(String[] args) { - Hooks.onOperatorDebug(); - SpringApplication app = new SpringApplication(ConsumerDebuggingApplication.class); - app.setDefaultProperties(Collections.singletonMap("server.port", "8082")); - app.run(args); - } - - @Bean - public SecurityWebFilterChain debuggingConsumerSpringSecurityFilterChain(ServerHttpSecurity http) { - http.authorizeExchange() - .anyExchange() - .permitAll(); - http.csrf().disable(); - return http.build(); - } -} diff --git a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/chronjobs/ChronJobs.java b/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/chronjobs/ChronJobs.java deleted file mode 100644 index bf96ab56d6..0000000000 --- a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/chronjobs/ChronJobs.java +++ /dev/null @@ -1,154 +0,0 @@ -package com.baeldung.debugging.consumer.chronjobs; - -import java.time.Duration; -import java.util.concurrent.ThreadLocalRandom; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.MediaType; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; -import org.springframework.web.reactive.function.client.WebClient; - -import com.baeldung.debugging.consumer.model.Foo; -import com.baeldung.debugging.consumer.model.FooDto; -import com.baeldung.debugging.consumer.service.FooService; - -import reactor.core.publisher.Flux; - -@Component -public class ChronJobs { - - private static Logger logger = LoggerFactory.getLogger(ChronJobs.class); - private WebClient client = WebClient.create("http://localhost:8081"); - - @Autowired - private FooService service; - - @Scheduled(fixedRate = 10000) - public void consumeInfiniteFlux() { - Flux fluxFoo = client.get() - .uri("/functional-reactive/periodic-foo") - .accept(MediaType.TEXT_EVENT_STREAM) - .retrieve() - .bodyToFlux(FooDto.class) - .delayElements(Duration.ofMillis(100)) - .map(dto -> { - logger.debug("process 1 with dto id {} name{}", dto.getId(), dto.getName()); - return new Foo(dto); - }); - Integer random = ThreadLocalRandom.current() - .nextInt(0, 3); - switch (random) { - case 0: - logger.info("process 1 with approach 1"); - service.processFoo(fluxFoo); - break; - case 1: - logger.info("process 1 with approach 1 EH"); - service.processUsingApproachOneWithErrorHandling(fluxFoo); - break; - default: - logger.info("process 1 with approach 2"); - service.processFooInAnotherScenario(fluxFoo); - break; - - } - } - - @Scheduled(fixedRate = 20000) - public void consumeFiniteFlux2() { - Flux fluxFoo = client.get() - .uri("/functional-reactive/periodic-foo-2") - .accept(MediaType.TEXT_EVENT_STREAM) - .retrieve() - .bodyToFlux(FooDto.class) - .delayElements(Duration.ofMillis(100)) - .map(dto -> { - logger.debug("process 2 with dto id {} name{}", dto.getId(), dto.getName()); - return new Foo(dto); - }); - Integer random = ThreadLocalRandom.current() - .nextInt(0, 3); - switch (random) { - case 0: - logger.info("process 2 with approach 1"); - service.processFoo(fluxFoo); - break; - case 1: - logger.info("process 2 with approach 1 EH"); - service.processUsingApproachOneWithErrorHandling(fluxFoo); - break; - default: - logger.info("process 2 with approach 2"); - service.processFooInAnotherScenario(fluxFoo); - break; - - } - } - - @Scheduled(fixedRate = 20000) - public void consumeFiniteFlux3() { - Flux fluxFoo = client.get() - .uri("/functional-reactive/periodic-foo-2") - .accept(MediaType.TEXT_EVENT_STREAM) - .retrieve() - .bodyToFlux(FooDto.class) - .delayElements(Duration.ofMillis(100)) - .map(dto -> { - logger.debug("process 3 with dto id {} name{}", dto.getId(), dto.getName()); - return new Foo(dto); - }); - logger.info("process 3 with approach 3"); - service.processUsingApproachThree(fluxFoo); - } - - @Scheduled(fixedRate = 20000) - public void consumeFiniteFluxWithCheckpoint4() { - Flux fluxFoo = client.get() - .uri("/functional-reactive/periodic-foo-2") - .accept(MediaType.TEXT_EVENT_STREAM) - .retrieve() - .bodyToFlux(FooDto.class) - .delayElements(Duration.ofMillis(100)) - .map(dto -> { - logger.debug("process 4 with dto id {} name{}", dto.getId(), dto.getName()); - return new Foo(dto); - }); - logger.info("process 4 with approach 4"); - service.processUsingApproachFourWithCheckpoint(fluxFoo); - } - - @Scheduled(fixedRate = 20000) - public void consumeFiniteFluxWitParallelScheduler() { - Flux fluxFoo = client.get() - .uri("/functional-reactive/periodic-foo-2") - .accept(MediaType.TEXT_EVENT_STREAM) - .retrieve() - .bodyToFlux(FooDto.class) - .delayElements(Duration.ofMillis(100)) - .map(dto -> { - logger.debug("process 5-parallel with dto id {} name{}", dto.getId(), dto.getName()); - return new Foo(dto); - }); - logger.info("process 5-parallel with approach 5-parallel"); - service.processUsingApproachFivePublishingToDifferentParallelThreads(fluxFoo); - } - - @Scheduled(fixedRate = 20000) - public void consumeFiniteFluxWithSingleSchedulers() { - Flux fluxFoo = client.get() - .uri("/functional-reactive/periodic-foo-2") - .accept(MediaType.TEXT_EVENT_STREAM) - .retrieve() - .bodyToFlux(FooDto.class) - .delayElements(Duration.ofMillis(100)) - .map(dto -> { - logger.debug("process 5-single with dto id {} name{}", dto.getId(), dto.getName()); - return new Foo(dto); - }); - logger.info("process 5-single with approach 5-single"); - service.processUsingApproachFivePublishingToDifferentSingleThreads(fluxFoo); - } -} diff --git a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/controllers/ReactiveConfigsToggleRestController.java b/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/controllers/ReactiveConfigsToggleRestController.java deleted file mode 100644 index 3dcdc6c7c0..0000000000 --- a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/controllers/ReactiveConfigsToggleRestController.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.baeldung.debugging.consumer.controllers; - -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RestController; - -import reactor.core.publisher.Hooks; - -@RestController -public class ReactiveConfigsToggleRestController { - - @GetMapping("/debug-hook-on") - public String setReactiveDebugOn() { - Hooks.onOperatorDebug(); - return "DEBUG HOOK ON"; - } - - @GetMapping("/debug-hook-off") - public String setReactiveDebugOff() { - Hooks.resetOnOperatorDebug(); - return "DEBUG HOOK OFF"; - } - -} diff --git a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/model/Foo.java b/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/model/Foo.java deleted file mode 100644 index 916ca93bfc..0000000000 --- a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/model/Foo.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.baeldung.debugging.consumer.model; - -import java.util.concurrent.ThreadLocalRandom; - -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; - -@Getter -@Setter -@NoArgsConstructor -@AllArgsConstructor -public class Foo { - - private Integer id; - private String formattedName; - private Integer quantity; - - public Foo(FooDto dto) { - this.id = (ThreadLocalRandom.current() - .nextInt(0, 100) == 0) ? null : dto.getId(); - this.formattedName = dto.getName(); - this.quantity = ThreadLocalRandom.current() - .nextInt(0, 10); - } -} diff --git a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/model/FooDto.java b/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/model/FooDto.java deleted file mode 100644 index 33f19c4e60..0000000000 --- a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/model/FooDto.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.baeldung.debugging.consumer.model; - -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; - -@Getter -@Setter -@NoArgsConstructor -@AllArgsConstructor -public class FooDto { - - private Integer id; - private String name; -} diff --git a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/service/FooNameHelper.java b/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/service/FooNameHelper.java deleted file mode 100644 index 772b360437..0000000000 --- a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/service/FooNameHelper.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.baeldung.debugging.consumer.service; - -import java.util.concurrent.ThreadLocalRandom; - -import com.baeldung.debugging.consumer.model.Foo; - -import reactor.core.publisher.Flux; - -public class FooNameHelper { - - public static Flux concatAndSubstringFooName(Flux flux) { - flux = concatFooName(flux); - flux = substringFooName(flux); - return flux; - } - - public static Flux concatFooName(Flux flux) { - flux = flux.map(foo -> { - String processedName = null; - Integer random = ThreadLocalRandom.current() - .nextInt(0, 80); - processedName = (random != 0) ? foo.getFormattedName() : foo.getFormattedName() + "-bael"; - foo.setFormattedName(processedName); - return foo; - }); - return flux; - } - - public static Flux substringFooName(Flux flux) { - return flux.map(foo -> { - String processedName; - Integer random = ThreadLocalRandom.current() - .nextInt(0, 100); - - processedName = (random == 0) ? foo.getFormattedName() - .substring(10, 15) - : foo.getFormattedName() - .substring(0, 5); - - foo.setFormattedName(processedName); - return foo; - }); - } - -} diff --git a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/service/FooQuantityHelper.java b/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/service/FooQuantityHelper.java deleted file mode 100644 index 615239313d..0000000000 --- a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/service/FooQuantityHelper.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.baeldung.debugging.consumer.service; - -import java.util.concurrent.ThreadLocalRandom; - -import com.baeldung.debugging.consumer.model.Foo; - -import reactor.core.publisher.Flux; - -public class FooQuantityHelper { - - public static Flux processFooReducingQuantity(Flux flux) { - flux = flux.map(foo -> { - Integer result; - Integer random = ThreadLocalRandom.current() - .nextInt(0, 90); - result = (random == 0) ? result = 0 : foo.getQuantity() + 2; - foo.setQuantity(result); - return foo; - }); - return divideFooQuantity(flux); - } - - public static Flux divideFooQuantity(Flux flux) { - return flux.map(foo -> { - Integer result = Math.round(5 / foo.getQuantity()); - foo.setQuantity(result); - return foo; - }); - } - -} diff --git a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/service/FooReporter.java b/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/service/FooReporter.java deleted file mode 100644 index f53cd238e0..0000000000 --- a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/service/FooReporter.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.baeldung.debugging.consumer.service; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.baeldung.debugging.consumer.model.Foo; - -import reactor.core.publisher.Flux; - -public class FooReporter { - - private static Logger logger = LoggerFactory.getLogger(FooReporter.class); - - public static Flux reportResult(Flux input, String approach) { - return input.map(foo -> { - if (foo.getId() == null) - throw new IllegalArgumentException("Null id is not valid!"); - logger.info("Reporting for approach {}: Foo with id '{}' name '{}' and quantity '{}'", approach, foo.getId(), foo.getFormattedName(), foo.getQuantity()); - return foo; - }); - } - - public static Flux reportResult(Flux input) { - return reportResult(input, "default"); - } -} diff --git a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/service/FooService.java b/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/service/FooService.java deleted file mode 100644 index 438f6d473c..0000000000 --- a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/consumer/service/FooService.java +++ /dev/null @@ -1,120 +0,0 @@ -package com.baeldung.debugging.consumer.service; - -import static com.baeldung.debugging.consumer.service.FooNameHelper.concatAndSubstringFooName; -import static com.baeldung.debugging.consumer.service.FooNameHelper.substringFooName; -import static com.baeldung.debugging.consumer.service.FooQuantityHelper.divideFooQuantity; -import static com.baeldung.debugging.consumer.service.FooQuantityHelper.processFooReducingQuantity; -import static com.baeldung.debugging.consumer.service.FooReporter.reportResult; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -import com.baeldung.debugging.consumer.model.Foo; - -import reactor.core.publisher.Flux; -import reactor.core.scheduler.Schedulers; - -@Component -public class FooService { - - private static Logger logger = LoggerFactory.getLogger(FooService.class); - - public void processFoo(Flux flux) { - flux = FooNameHelper.concatFooName(flux); - flux = FooNameHelper.substringFooName(flux); - flux = flux.log(); - flux = FooReporter.reportResult(flux); - flux = flux.doOnError(error -> { - logger.error("The following error happened on processFoo method!", error); - }); - flux.subscribe(); - } - - public void processFooInAnotherScenario(Flux flux) { - flux = FooNameHelper.substringFooName(flux); - flux = FooQuantityHelper.divideFooQuantity(flux); - flux.subscribe(); - } - - public void processUsingApproachOneWithErrorHandling(Flux flux) { - logger.info("starting approach one w error handling!"); - flux = concatAndSubstringFooName(flux); - flux = concatAndSubstringFooName(flux); - flux = substringFooName(flux); - flux = processFooReducingQuantity(flux); - flux = processFooReducingQuantity(flux); - flux = processFooReducingQuantity(flux); - flux = reportResult(flux, "ONE w/ EH"); - flux = flux.doOnError(error -> { - logger.error("Approach 1 with Error Handling failed!", error); - }); - flux.subscribe(); - } - - public void processUsingApproachThree(Flux flux) { - logger.info("starting approach three!"); - flux = concatAndSubstringFooName(flux); - flux = reportResult(flux, "THREE"); - flux = flux.doOnError(error -> { - logger.error("Approach 3 failed!", error); - }); - flux.subscribe(); - } - - public void processUsingApproachFourWithCheckpoint(Flux flux) { - logger.info("starting approach four!"); - flux = concatAndSubstringFooName(flux); - flux = flux.checkpoint("CHECKPOINT 1"); - flux = concatAndSubstringFooName(flux); - flux = divideFooQuantity(flux); - flux = flux.checkpoint("CHECKPOINT 2", true); - flux = reportResult(flux, "FOUR"); - flux = concatAndSubstringFooName(flux).doOnError(error -> { - logger.error("Approach 4 failed!", error); - }); - flux.subscribe(); - } - - public void processUsingApproachFourWithInitialCheckpoint(Flux flux) { - logger.info("starting approach four!"); - flux = concatAndSubstringFooName(flux); - flux = flux.checkpoint("CHECKPOINT 1", true); - flux = concatAndSubstringFooName(flux); - flux = divideFooQuantity(flux); - flux = reportResult(flux, "FOUR"); - flux = flux.doOnError(error -> { - logger.error("Approach 4-2 failed!", error); - }); - flux.subscribe(); - } - - public void processUsingApproachFivePublishingToDifferentParallelThreads(Flux flux) { - logger.info("starting approach five-parallel!"); - flux = concatAndSubstringFooName(flux).publishOn(Schedulers.newParallel("five-parallel-foo")) - .log(); - flux = concatAndSubstringFooName(flux); - flux = divideFooQuantity(flux); - flux = reportResult(flux, "FIVE-PARALLEL").publishOn(Schedulers.newSingle("five-parallel-bar")); - flux = concatAndSubstringFooName(flux).doOnError(error -> { - logger.error("Approach 5-parallel failed!", error); - }); - flux.subscribeOn(Schedulers.newParallel("five-parallel-starter")) - .subscribe(); - } - - public void processUsingApproachFivePublishingToDifferentSingleThreads(Flux flux) { - logger.info("starting approach five-single!"); - flux = flux.log() - .subscribeOn(Schedulers.newSingle("five-single-starter")); - flux = concatAndSubstringFooName(flux).publishOn(Schedulers.newSingle("five-single-foo")); - flux = concatAndSubstringFooName(flux); - flux = divideFooQuantity(flux); - flux = reportResult(flux, "FIVE-SINGLE").publishOn(Schedulers.newSingle("five-single-bar")); - flux = concatAndSubstringFooName(flux).doOnError(error -> { - logger.error("Approach 5-single failed!", error); - }); - flux.subscribe(); - } - -} diff --git a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/server/ServerDebuggingApplication.java b/spring-5-reactive-2/src/main/java/com/baeldung/debugging/server/ServerDebuggingApplication.java deleted file mode 100644 index 4fdc1dd137..0000000000 --- a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/server/ServerDebuggingApplication.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.baeldung.debugging.server; - -import java.util.Collections; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.Bean; -import org.springframework.security.config.web.server.ServerHttpSecurity; -import org.springframework.security.web.server.SecurityWebFilterChain; -import org.springframework.web.reactive.config.EnableWebFlux; - -@EnableWebFlux -@SpringBootApplication -public class ServerDebuggingApplication { - - public static void main(String[] args) { - SpringApplication app = new SpringApplication(ServerDebuggingApplication.class); - app.setDefaultProperties(Collections.singletonMap("server.port", "8081")); - app.run(args); - } - - @Bean - public SecurityWebFilterChain debuggingServerSpringSecurityFilterChain(ServerHttpSecurity http) { - http.authorizeExchange() - .anyExchange() - .permitAll(); - return http.build(); - } -} diff --git a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/server/handlers/ServerHandler.java b/spring-5-reactive-2/src/main/java/com/baeldung/debugging/server/handlers/ServerHandler.java deleted file mode 100644 index 759cd9b01d..0000000000 --- a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/server/handlers/ServerHandler.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.baeldung.debugging.server.handlers; - -import java.time.Duration; -import java.util.concurrent.ThreadLocalRandom; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.http.MediaType; -import org.springframework.stereotype.Component; -import org.springframework.web.reactive.function.server.ServerRequest; -import org.springframework.web.reactive.function.server.ServerResponse; - -import com.baeldung.debugging.server.model.Foo; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -@Component -public class ServerHandler { - - private static Logger logger = LoggerFactory.getLogger(ServerHandler.class); - - public Mono useHandler(final ServerRequest request) { - // there are chances that something goes wrong here... - return ServerResponse.ok() - .contentType(MediaType.TEXT_EVENT_STREAM) - .body(Flux.interval(Duration.ofSeconds(1)) - .map(sequence -> { - logger.info("retrieving Foo. Sequence: {}", sequence); - if (ThreadLocalRandom.current() - .nextInt(0, 50) == 1) { - throw new RuntimeException("There was an error retrieving the Foo!"); - } - return new Foo(sequence, "name" + sequence); - - }), Foo.class); - } - - public Mono useHandlerFinite(final ServerRequest request) { - return ServerResponse.ok() - .contentType(MediaType.TEXT_EVENT_STREAM) - .body(Flux.range(0, 50) - .map(sequence -> { - return new Foo(new Long(sequence), "theFooNameNumber" + sequence); - }), Foo.class); - } -} diff --git a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/server/model/Foo.java b/spring-5-reactive-2/src/main/java/com/baeldung/debugging/server/model/Foo.java deleted file mode 100644 index 2d9491f3dd..0000000000 --- a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/server/model/Foo.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.baeldung.debugging.server.model; - -import lombok.AllArgsConstructor; -import lombok.Data; - -@Data -@AllArgsConstructor -public class Foo { - - private Long id; - private String name; - -} diff --git a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/server/routers/ServerRouter.java b/spring-5-reactive-2/src/main/java/com/baeldung/debugging/server/routers/ServerRouter.java deleted file mode 100644 index 6378b2213d..0000000000 --- a/spring-5-reactive-2/src/main/java/com/baeldung/debugging/server/routers/ServerRouter.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.baeldung.debugging.server.routers; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.web.reactive.function.server.RequestPredicates; -import org.springframework.web.reactive.function.server.RouterFunction; -import org.springframework.web.reactive.function.server.RouterFunctions; -import org.springframework.web.reactive.function.server.ServerResponse; - -import com.baeldung.debugging.server.handlers.ServerHandler; - -@Configuration -public class ServerRouter { - - @Bean - public RouterFunction responseRoute(@Autowired ServerHandler handler) { - return RouterFunctions.route(RequestPredicates.GET("/functional-reactive/periodic-foo"), handler::useHandler) - .andRoute(RequestPredicates.GET("/functional-reactive/periodic-foo-2"), handler::useHandlerFinite); - } - -} diff --git a/spring-5-reactive-2/src/main/java/com/baeldung/webclient/Tweet.java b/spring-5-reactive-2/src/main/java/com/baeldung/webclient/Tweet.java deleted file mode 100644 index 8d294955f3..0000000000 --- a/spring-5-reactive-2/src/main/java/com/baeldung/webclient/Tweet.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.baeldung.webclient; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@NoArgsConstructor -@AllArgsConstructor -public class Tweet { - private String text; - private String username; -} diff --git a/spring-5-reactive-2/src/main/java/com/baeldung/webclient/TweetsSlowServiceController.java b/spring-5-reactive-2/src/main/java/com/baeldung/webclient/TweetsSlowServiceController.java deleted file mode 100644 index fecaca25ef..0000000000 --- a/spring-5-reactive-2/src/main/java/com/baeldung/webclient/TweetsSlowServiceController.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.baeldung.webclient; - -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RestController; - -import java.util.Arrays; -import java.util.List; - -@RestController -public class TweetsSlowServiceController { - - @GetMapping("/slow-service-tweets") - private List getAllTweets() throws Exception { - Thread.sleep(2000L); // delay - return Arrays.asList( - new Tweet("RestTemplate rules", "@user1"), - new Tweet("WebClient is better", "@user2"), - new Tweet("OK, both are useful", "@user1")); - } -} diff --git a/spring-5-reactive-2/src/main/java/com/baeldung/webclient/WebClientApplication.java b/spring-5-reactive-2/src/main/java/com/baeldung/webclient/WebClientApplication.java deleted file mode 100644 index 3c53a2c1d3..0000000000 --- a/spring-5-reactive-2/src/main/java/com/baeldung/webclient/WebClientApplication.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.baeldung.webclient; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.Bean; -import org.springframework.security.config.web.server.ServerHttpSecurity; -import org.springframework.security.web.server.SecurityWebFilterChain; - -@SpringBootApplication -public class WebClientApplication { - public static void main(String[] args) { - SpringApplication.run(WebClientApplication.class, args); - } - - @Bean - public SecurityWebFilterChain functionalValidationsSpringSecurityFilterChain(ServerHttpSecurity http) { - http.authorizeExchange() - .anyExchange() - .permitAll(); - http.csrf().disable(); - return http.build(); - } -} diff --git a/spring-5-reactive-2/src/main/java/com/baeldung/webclient/WebController.java b/spring-5-reactive-2/src/main/java/com/baeldung/webclient/WebController.java deleted file mode 100644 index 73f5724819..0000000000 --- a/spring-5-reactive-2/src/main/java/com/baeldung/webclient/WebController.java +++ /dev/null @@ -1,60 +0,0 @@ -package com.baeldung.webclient; - -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; -import org.springframework.core.ParameterizedTypeReference; -import org.springframework.http.HttpMethod; -import org.springframework.http.MediaType; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RestController; -import org.springframework.web.client.RestTemplate; -import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Flux; - -import java.util.List; - -@Slf4j -@RestController -public class WebController { - - private static final int DEFAULT_PORT = 8080; - - @Setter - private int serverPort = DEFAULT_PORT; - - @GetMapping("/tweets-blocking") - public List getTweetsBlocking() { - log.info("Starting BLOCKING Controller!"); - final String uri = getSlowServiceUri(); - - RestTemplate restTemplate = new RestTemplate(); - ResponseEntity> response = restTemplate.exchange( - uri, HttpMethod.GET, null, - new ParameterizedTypeReference>(){}); - - List result = response.getBody(); - result.forEach(tweet -> log.info(tweet.toString())); - log.info("Exiting BLOCKING Controller!"); - return result; - } - - @GetMapping(value = "/tweets-non-blocking", produces = MediaType.TEXT_EVENT_STREAM_VALUE) - public Flux getTweetsNonBlocking() { - log.info("Starting NON-BLOCKING Controller!"); - Flux tweetFlux = WebClient.create() - .get() - .uri(getSlowServiceUri()) - .retrieve() - .bodyToFlux(Tweet.class); - - tweetFlux.subscribe(tweet -> log.info(tweet.toString())); - log.info("Exiting NON-BLOCKING Controller!"); - return tweetFlux; - } - - private String getSlowServiceUri() { - return "http://localhost:" + serverPort + "/slow-service-tweets"; - } - -} diff --git a/spring-5-reactive-2/src/test/java/com/baeldung/debugging/consumer/ConsumerFooServiceIntegrationTest.java b/spring-5-reactive-2/src/test/java/com/baeldung/debugging/consumer/ConsumerFooServiceIntegrationTest.java deleted file mode 100644 index 9d04541f8d..0000000000 --- a/spring-5-reactive-2/src/test/java/com/baeldung/debugging/consumer/ConsumerFooServiceIntegrationTest.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.baeldung.debugging.consumer; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import com.baeldung.debugging.consumer.model.Foo; -import com.baeldung.debugging.consumer.service.FooService; -import com.baeldung.debugging.consumer.utils.ListAppender; - -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.classic.spi.IThrowableProxy; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Hooks; - -public class ConsumerFooServiceIntegrationTest { - - FooService service = new FooService(); - - @BeforeEach - public void clearLogList() { - Hooks.onOperatorDebug(); - ListAppender.clearEventList(); - } - - @Test - public void givenFooWithNullId_whenProcessFoo_thenLogsWithDebugTrace() { - Foo one = new Foo(1, "nameverylong", 8); - Foo two = new Foo(null, "nameverylong", 4); - Flux flux = Flux.just(one, two); - - service.processFoo(flux); - - Collection allLoggedEntries = ListAppender.getEvents() - .stream() - .map(ILoggingEvent::getFormattedMessage) - .collect(Collectors.toList()); - - Collection allSuppressedEntries = ListAppender.getEvents() - .stream() - .map(ILoggingEvent::getThrowableProxy) - .flatMap(t -> { - return Optional.ofNullable(t) - .map(IThrowableProxy::getSuppressed) - .map(Arrays::stream) - .orElse(Stream.empty()); - }) - .map(IThrowableProxy::getClassName) - .collect(Collectors.toList()); - assertThat(allLoggedEntries).anyMatch(entry -> entry.contains("The following error happened on processFoo method!")) - .anyMatch(entry -> entry.contains("| onSubscribe")) - .anyMatch(entry -> entry.contains("| cancel()")); - - assertThat(allSuppressedEntries) - .anyMatch(entry -> entry.contains("reactor.core.publisher.FluxOnAssembly$OnAssemblyException")); - } -} diff --git a/spring-5-reactive-2/src/test/java/com/baeldung/debugging/consumer/ConsumerFooServiceLiveTest.java b/spring-5-reactive-2/src/test/java/com/baeldung/debugging/consumer/ConsumerFooServiceLiveTest.java deleted file mode 100644 index e61ea9e155..0000000000 --- a/spring-5-reactive-2/src/test/java/com/baeldung/debugging/consumer/ConsumerFooServiceLiveTest.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.baeldung.debugging.consumer; - -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.springframework.test.web.reactive.server.WebTestClient; -import org.springframework.test.web.reactive.server.WebTestClient.ResponseSpec; - -import com.baeldung.debugging.consumer.service.FooService; - -/** - * In order to run this live test, start the following classes: - * - com.baeldung.debugging.server.ServerDebuggingApplication - * - com.baeldung.debugging.consumer.ConsumerDebuggingApplication - */ -public class ConsumerFooServiceLiveTest { - - FooService service = new FooService(); - - private static final String BASE_URL = "http://localhost:8082"; - private static final String DEBUG_HOOK_ON = BASE_URL + "/debug-hook-on"; - private static final String DEBUG_HOOK_OFF = BASE_URL + "/debug-hook-off"; - - private static WebTestClient client; - - @BeforeAll - public static void setup() { - client = WebTestClient.bindToServer() - .baseUrl(BASE_URL) - .build(); - } - - @Test - public void whenRequestingDebugHookOn_thenObtainExpectedMessage() { - ResponseSpec response = client.get() - .uri(DEBUG_HOOK_ON) - .exchange(); - response.expectStatus() - .isOk() - .expectBody(String.class) - .isEqualTo("DEBUG HOOK ON"); - } - - @Test - public void whenRequestingDebugHookOff_thenObtainExpectedMessage() { - ResponseSpec response = client.get() - .uri(DEBUG_HOOK_OFF) - .exchange(); - response.expectStatus() - .isOk() - .expectBody(String.class) - .isEqualTo("DEBUG HOOK OFF"); - } - -} diff --git a/spring-5-reactive-2/src/test/java/com/baeldung/debugging/consumer/utils/ListAppender.java b/spring-5-reactive-2/src/test/java/com/baeldung/debugging/consumer/utils/ListAppender.java deleted file mode 100644 index c8c1c110bb..0000000000 --- a/spring-5-reactive-2/src/test/java/com/baeldung/debugging/consumer/utils/ListAppender.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.baeldung.debugging.consumer.utils; - -import java.util.ArrayList; -import java.util.List; - -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.AppenderBase; - -public class ListAppender extends AppenderBase { - - static private List events = new ArrayList<>(); - - @Override - protected void append(ILoggingEvent eventObject) { - events.add(eventObject); - } - - public static List getEvents() { - return events; - } - - public static void clearEventList() { - events.clear(); - } -} diff --git a/spring-5-reactive-2/src/test/java/com/baeldung/webclient/WebControllerIntegrationTest.java b/spring-5-reactive-2/src/test/java/com/baeldung/webclient/WebControllerIntegrationTest.java deleted file mode 100644 index 09c3a5fb84..0000000000 --- a/spring-5-reactive-2/src/test/java/com/baeldung/webclient/WebControllerIntegrationTest.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.baeldung.webclient; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.web.server.LocalServerPort; -import org.springframework.test.context.junit4.SpringRunner; -import org.springframework.test.web.reactive.server.WebTestClient; - -@RunWith(SpringRunner.class) -@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = WebClientApplication.class) -public class WebControllerIntegrationTest { - - @LocalServerPort - int randomServerPort; - - @Autowired - private WebTestClient testClient; - - @Autowired - private WebController webController; - - @Before - public void setup() { - webController.setServerPort(randomServerPort); - } - - @Test - public void whenEndpointWithBlockingClientIsCalled_thenThreeTweetsAreReceived() { - testClient.get() - .uri("/tweets-blocking") - .exchange() - .expectStatus() - .isOk() - .expectBodyList(Tweet.class) - .hasSize(3); - } - - @Test - public void whenEndpointWithNonBlockingClientIsCalled_thenThreeTweetsAreReceived() { - testClient.get() - .uri("/tweets-non-blocking") - .exchange() - .expectStatus() - .isOk() - .expectBodyList(Tweet.class) - .hasSize(3); - } -} \ No newline at end of file diff --git a/spring-5-reactive-security/README.md b/spring-5-reactive-security/README.md index 915f74cd78..37f999648c 100644 --- a/spring-5-reactive-security/README.md +++ b/spring-5-reactive-security/README.md @@ -8,8 +8,5 @@ The "REST With Spring" Classes: http://bit.ly/restwithspring ### Relevant Articles - [Spring Boot Actuator](https://www.baeldung.com/spring-boot-actuators) -- [Spring Security 5 for Reactive Applications](https://www.baeldung.com/spring-security-5-reactive) -- [Guide to Spring 5 WebFlux](https://www.baeldung.com/spring-webflux) -- [Introduction to the Functional Web Framework in Spring 5](https://www.baeldung.com/spring-5-functional-web) - [Guide to the AuthenticationManagerResolver in Spring Security](https://www.baeldung.com/spring-security-authenticationmanagerresolver) - [Spring Webflux and CORS](https://www.baeldung.com/spring-webflux-cors) diff --git a/spring-5-reactive-security/pom.xml b/spring-5-reactive-security/pom.xml index 140c6b8296..7b697c7b00 100644 --- a/spring-5-reactive-security/pom.xml +++ b/spring-5-reactive-security/pom.xml @@ -110,7 +110,7 @@ org.springframework.boot spring-boot-maven-plugin - com.baeldung.webflux.EmployeeSpringApplication + com.baeldung.reactive.actuator.Spring5ReactiveApplication JAR diff --git a/spring-5-reactive-security/src/main/java/com/baeldung/reactive/functional/EmployeeFunctionalConfig.java b/spring-5-reactive-security/src/main/java/com/baeldung/reactive/functional/EmployeeFunctionalConfig.java deleted file mode 100644 index 76b697c1aa..0000000000 --- a/spring-5-reactive-security/src/main/java/com/baeldung/reactive/functional/EmployeeFunctionalConfig.java +++ /dev/null @@ -1,75 +0,0 @@ -package com.baeldung.reactive.functional; - -import static org.springframework.web.reactive.function.BodyExtractors.toMono; -import static org.springframework.web.reactive.function.server.RequestPredicates.GET; -import static org.springframework.web.reactive.function.server.RequestPredicates.POST; -import static org.springframework.web.reactive.function.server.RouterFunctions.route; -import static org.springframework.web.reactive.function.server.ServerResponse.ok; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.security.config.web.server.ServerHttpSecurity; -import org.springframework.security.web.server.SecurityWebFilterChain; -import org.springframework.web.reactive.function.server.RouterFunction; -import org.springframework.web.reactive.function.server.ServerResponse; - -import com.baeldung.webflux.Employee; -import com.baeldung.webflux.EmployeeRepository; - -@Configuration -public class EmployeeFunctionalConfig { - - @Bean - EmployeeRepository employeeRepository() { - return new EmployeeRepository(); - } - - @Bean - RouterFunction getAllEmployeesRoute() { - return route(GET("/employees"), - req -> ok().body( - employeeRepository().findAllEmployees(), Employee.class)); - } - - @Bean - RouterFunction getEmployeeByIdRoute() { - return route(GET("/employees/{id}"), - req -> ok().body( - employeeRepository().findEmployeeById(req.pathVariable("id")), Employee.class)); - } - - @Bean - RouterFunction updateEmployeeRoute() { - return route(POST("/employees/update"), - req -> req.body(toMono(Employee.class)) - .doOnNext(employeeRepository()::updateEmployee) - .then(ok().build())); - } - - @Bean - RouterFunction composedRoutes() { - return - route(GET("/employees"), - req -> ok().body( - employeeRepository().findAllEmployees(), Employee.class)) - - .and(route(GET("/employees/{id}"), - req -> ok().body( - employeeRepository().findEmployeeById(req.pathVariable("id")), Employee.class))) - - .and(route(POST("/employees/update"), - req -> req.body(toMono(Employee.class)) - .doOnNext(employeeRepository()::updateEmployee) - .then(ok().build()))); - } - - @Bean - public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) { - http.csrf() - .disable() - .authorizeExchange() - .anyExchange() - .permitAll(); - return http.build(); - } -} diff --git a/spring-5-reactive-security/src/main/java/com/baeldung/reactive/functional/EmployeeSpringFunctionalApplication.java b/spring-5-reactive-security/src/main/java/com/baeldung/reactive/functional/EmployeeSpringFunctionalApplication.java deleted file mode 100644 index 1f2bd871fc..0000000000 --- a/spring-5-reactive-security/src/main/java/com/baeldung/reactive/functional/EmployeeSpringFunctionalApplication.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.baeldung.reactive.functional; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - -@SpringBootApplication -public class EmployeeSpringFunctionalApplication { - - public static void main(String[] args) { - SpringApplication.run(EmployeeSpringFunctionalApplication.class, args); - } - -} diff --git a/spring-5-reactive-security/src/main/java/com/baeldung/reactive/security/GreetController.java b/spring-5-reactive-security/src/main/java/com/baeldung/reactive/security/GreetController.java deleted file mode 100644 index 99b79d88ea..0000000000 --- a/spring-5-reactive-security/src/main/java/com/baeldung/reactive/security/GreetController.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.baeldung.reactive.security; - -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RestController; -import reactor.core.publisher.Mono; - -import java.security.Principal; - -@RestController -public class GreetController { - - private GreetService greetService; - - public GreetController(GreetService greetService) { - this.greetService = greetService; - } - - @GetMapping("/") - public Mono greet(Mono principal) { - return principal - .map(Principal::getName) - .map(name -> String.format("Hello, %s", name)); - } - - @GetMapping("/admin") - public Mono greetAdmin(Mono principal) { - return principal - .map(Principal::getName) - .map(name -> String.format("Admin access: %s", name)); - } - - @GetMapping("/greetService") - public Mono greetService() { - return greetService.greet(); - } - -} diff --git a/spring-5-reactive-security/src/main/java/com/baeldung/reactive/security/GreetService.java b/spring-5-reactive-security/src/main/java/com/baeldung/reactive/security/GreetService.java deleted file mode 100644 index 93df64bced..0000000000 --- a/spring-5-reactive-security/src/main/java/com/baeldung/reactive/security/GreetService.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.baeldung.reactive.security; - -import org.springframework.security.access.prepost.PreAuthorize; -import org.springframework.stereotype.Service; -import reactor.core.publisher.Mono; - -@Service -public class GreetService { - - @PreAuthorize("hasRole('ADMIN')") - public Mono greet() { - return Mono.just("Hello from service!"); - } - -} diff --git a/spring-5-reactive-security/src/main/java/com/baeldung/reactive/security/SecurityConfig.java b/spring-5-reactive-security/src/main/java/com/baeldung/reactive/security/SecurityConfig.java deleted file mode 100644 index 64e96ddae1..0000000000 --- a/spring-5-reactive-security/src/main/java/com/baeldung/reactive/security/SecurityConfig.java +++ /dev/null @@ -1,60 +0,0 @@ -package com.baeldung.reactive.security; - -import org.springframework.boot.actuate.autoconfigure.security.reactive.EndpointRequest; -import org.springframework.context.annotation.Bean; -import org.springframework.security.config.annotation.method.configuration.EnableReactiveMethodSecurity; -import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity; -import org.springframework.security.config.web.server.ServerHttpSecurity; -import org.springframework.security.core.userdetails.MapReactiveUserDetailsService; -import org.springframework.security.core.userdetails.User; -import org.springframework.security.core.userdetails.UserDetails; -import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder; -import org.springframework.security.crypto.password.PasswordEncoder; -import org.springframework.security.web.server.SecurityWebFilterChain; - -import com.baeldung.reactive.actuator.FeaturesEndpoint; - -@EnableWebFluxSecurity -@EnableReactiveMethodSecurity -public class SecurityConfig { - - @Bean - public SecurityWebFilterChain securitygWebFilterChain(ServerHttpSecurity http) { - return http.authorizeExchange() - .pathMatchers("/admin") - .hasAuthority("ROLE_ADMIN") - .matchers(EndpointRequest.to(FeaturesEndpoint.class)) - .permitAll() - .anyExchange() - .authenticated() - .and() - .formLogin() - .and() - .csrf() - .disable() - .build(); - } - - @Bean - public MapReactiveUserDetailsService userDetailsService() { - UserDetails user = User - .withUsername("user") - .password(passwordEncoder().encode("password")) - .roles("USER") - .build(); - - UserDetails admin = User - .withUsername("admin") - .password(passwordEncoder().encode("password")) - .roles("ADMIN") - .build(); - - return new MapReactiveUserDetailsService(user, admin); - } - - @Bean - public PasswordEncoder passwordEncoder() { - return new BCryptPasswordEncoder(); - } - -} diff --git a/spring-5-reactive-security/src/main/java/com/baeldung/reactive/security/SpringSecurity5Application.java b/spring-5-reactive-security/src/main/java/com/baeldung/reactive/security/SpringSecurity5Application.java deleted file mode 100644 index bb0f007ada..0000000000 --- a/spring-5-reactive-security/src/main/java/com/baeldung/reactive/security/SpringSecurity5Application.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.baeldung.reactive.security; - -import org.springframework.context.ApplicationContext; -import org.springframework.context.annotation.AnnotationConfigApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.http.server.reactive.HttpHandler; -import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; -import org.springframework.web.reactive.config.EnableWebFlux; -import org.springframework.web.server.adapter.WebHttpHandlerBuilder; - -import reactor.netty.DisposableServer; -import reactor.netty.http.server.HttpServer; - -@ComponentScan(basePackages = {"com.baeldung.reactive.security"}) -@EnableWebFlux -public class SpringSecurity5Application { - - public static void main(String[] args) { - try (AnnotationConfigApplicationContext context = - new AnnotationConfigApplicationContext(SpringSecurity5Application.class)) { - context.getBean(DisposableServer.class).onDispose().block(); - } - } - - @Bean - public DisposableServer disposableServer(ApplicationContext context) { - HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context) - .build(); - ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler); - HttpServer httpServer = HttpServer.create().host("localhost").port(8083); - return httpServer.handle(adapter).bindNow(); - } - -} diff --git a/spring-5-reactive-security/src/main/java/com/baeldung/webflux/EmployeeController.java b/spring-5-reactive-security/src/main/java/com/baeldung/webflux/EmployeeController.java deleted file mode 100644 index 34e44afc8b..0000000000 --- a/spring-5-reactive-security/src/main/java/com/baeldung/webflux/EmployeeController.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.baeldung.webflux; - -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -@RestController -@RequestMapping("/employees") -public class EmployeeController { - - private EmployeeRepository employeeRepository; - - public EmployeeController(EmployeeRepository employeeRepository) { - this.employeeRepository = employeeRepository; - } - - @GetMapping("/{id}") - private Mono getEmployeeById(@PathVariable String id) { - return employeeRepository.findEmployeeById(id); - } - - @GetMapping - private Flux getAllEmployees() { - return employeeRepository.findAllEmployees(); - } - - @PostMapping("/update") - private Mono updateEmployee(@RequestBody Employee employee) { - return employeeRepository.updateEmployee(employee); - } - -} diff --git a/spring-5-reactive-security/src/main/java/com/baeldung/webflux/EmployeeRepository.java b/spring-5-reactive-security/src/main/java/com/baeldung/webflux/EmployeeRepository.java deleted file mode 100644 index d7f618f178..0000000000 --- a/spring-5-reactive-security/src/main/java/com/baeldung/webflux/EmployeeRepository.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.baeldung.webflux; - -import java.util.HashMap; -import java.util.Map; - -import org.springframework.stereotype.Repository; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -@Repository -public class EmployeeRepository { - - static Map employeeData; - - static Map employeeAccessData; - - static - { - employeeData = new HashMap<>(); - employeeData.put("1",new Employee("1","Employee 1")); - employeeData.put("2",new Employee("2","Employee 2")); - employeeData.put("3",new Employee("3","Employee 3")); - employeeData.put("4",new Employee("4","Employee 4")); - employeeData.put("5",new Employee("5","Employee 5")); - employeeData.put("6",new Employee("6","Employee 6")); - employeeData.put("7",new Employee("7","Employee 7")); - employeeData.put("8",new Employee("8","Employee 8")); - employeeData.put("9",new Employee("9","Employee 9")); - employeeData.put("10",new Employee("10","Employee 10")); - - employeeAccessData=new HashMap<>(); - employeeAccessData.put("1", "Employee 1 Access Key"); - employeeAccessData.put("2", "Employee 2 Access Key"); - employeeAccessData.put("3", "Employee 3 Access Key"); - employeeAccessData.put("4", "Employee 4 Access Key"); - employeeAccessData.put("5", "Employee 5 Access Key"); - employeeAccessData.put("6", "Employee 6 Access Key"); - employeeAccessData.put("7", "Employee 7 Access Key"); - employeeAccessData.put("8", "Employee 8 Access Key"); - employeeAccessData.put("9", "Employee 9 Access Key"); - employeeAccessData.put("10", "Employee 10 Access Key"); - } - - public Mono findEmployeeById(String id) - { - return Mono.just(employeeData.get(id)); - } - - public Flux findAllEmployees() - { - return Flux.fromIterable(employeeData.values()); - } - - public Mono updateEmployee(Employee employee) - { - Employee existingEmployee=employeeData.get(employee.getId()); - if(existingEmployee!=null) - { - existingEmployee.setName(employee.getName()); - } - return Mono.just(existingEmployee); - } -} diff --git a/spring-5-reactive-security/src/main/java/com/baeldung/webflux/EmployeeSpringApplication.java b/spring-5-reactive-security/src/main/java/com/baeldung/webflux/EmployeeSpringApplication.java deleted file mode 100644 index 2652c36695..0000000000 --- a/spring-5-reactive-security/src/main/java/com/baeldung/webflux/EmployeeSpringApplication.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.baeldung.webflux; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - -@SpringBootApplication -public class EmployeeSpringApplication { - - public static void main(String[] args) { - - SpringApplication.run(EmployeeSpringApplication.class, args); - - EmployeeWebClient employeeWebClient = new EmployeeWebClient(); - employeeWebClient.consume(); - } - -} diff --git a/spring-5-reactive-security/src/main/java/com/baeldung/webflux/EmployeeWebClient.java b/spring-5-reactive-security/src/main/java/com/baeldung/webflux/EmployeeWebClient.java deleted file mode 100644 index eb32408a7f..0000000000 --- a/spring-5-reactive-security/src/main/java/com/baeldung/webflux/EmployeeWebClient.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.baeldung.webflux; - -import org.springframework.web.reactive.function.client.WebClient; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -public class EmployeeWebClient { - - WebClient client = WebClient.create("http://localhost:8080"); - - public void consume() { - - Mono employeeMono = client.get() - .uri("/employees/{id}", "1") - .retrieve() - .bodyToMono(Employee.class); - - employeeMono.subscribe(System.out::println); - - Flux employeeFlux = client.get() - .uri("/employees") - .retrieve() - .bodyToFlux(Employee.class); - - employeeFlux.subscribe(System.out::println); - } -} \ No newline at end of file diff --git a/spring-5-reactive-security/src/test/java/com/baeldung/SpringContextTest.java b/spring-5-reactive-security/src/test/java/com/baeldung/SpringContextTest.java index b4f4118527..e6123de118 100644 --- a/spring-5-reactive-security/src/test/java/com/baeldung/SpringContextTest.java +++ b/spring-5-reactive-security/src/test/java/com/baeldung/SpringContextTest.java @@ -1,14 +1,13 @@ package com.baeldung; +import com.baeldung.reactive.actuator.Spring5ReactiveApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; -import com.baeldung.reactive.security.SpringSecurity5Application; - @RunWith(SpringRunner.class) -@SpringBootTest(classes = SpringSecurity5Application.class) +@SpringBootTest(classes = Spring5ReactiveApplication.class) public class SpringContextTest { @Test diff --git a/spring-5-reactive-security/src/test/java/com/baeldung/reactive/functional/EmployeeSpringFunctionalIntegrationTest.java b/spring-5-reactive-security/src/test/java/com/baeldung/reactive/functional/EmployeeSpringFunctionalIntegrationTest.java deleted file mode 100644 index d8e2f0b23b..0000000000 --- a/spring-5-reactive-security/src/test/java/com/baeldung/reactive/functional/EmployeeSpringFunctionalIntegrationTest.java +++ /dev/null @@ -1,92 +0,0 @@ -package com.baeldung.reactive.functional; - -import com.baeldung.webflux.Employee; -import com.baeldung.webflux.EmployeeRepository; -import org.junit.FixMethodOrder; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.MethodSorters; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.mock.mockito.MockBean; -import org.springframework.test.context.junit4.SpringRunner; -import org.springframework.test.web.reactive.server.WebTestClient; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.Arrays; -import java.util.List; - -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.verify; - -@RunWith(SpringRunner.class) -@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = EmployeeSpringFunctionalApplication.class) -@FixMethodOrder(MethodSorters.NAME_ASCENDING) -public class EmployeeSpringFunctionalIntegrationTest { - - @Autowired - private EmployeeFunctionalConfig config; - - @MockBean - private EmployeeRepository employeeRepository; - - @Test - public void givenEmployeeId_whenGetEmployeeById_thenCorrectEmployee() { - WebTestClient client = WebTestClient - .bindToRouterFunction(config.getEmployeeByIdRoute()) - .build(); - - Employee employee = new Employee("1", "Employee 1"); - - given(employeeRepository.findEmployeeById("1")).willReturn(Mono.just(employee)); - - client.get() - .uri("/employees/1") - .exchange() - .expectStatus() - .isOk() - .expectBody(Employee.class) - .isEqualTo(employee); - } - - @Test - public void whenGetAllEmployees_thenCorrectEmployees() { - WebTestClient client = WebTestClient - .bindToRouterFunction(config.getAllEmployeesRoute()) - .build(); - - List employees = Arrays.asList( - new Employee("1", "Employee 1"), - new Employee("2", "Employee 2")); - - Flux employeeFlux = Flux.fromIterable(employees); - given(employeeRepository.findAllEmployees()).willReturn(employeeFlux); - - client.get() - .uri("/employees") - .exchange() - .expectStatus() - .isOk() - .expectBodyList(Employee.class) - .isEqualTo(employees); - } - - @Test - public void whenUpdateEmployee_thenEmployeeUpdated() { - WebTestClient client = WebTestClient - .bindToRouterFunction(config.updateEmployeeRoute()) - .build(); - - Employee employee = new Employee("1", "Employee 1 Updated"); - - client.post() - .uri("/employees/update") - .body(Mono.just(employee), Employee.class) - .exchange() - .expectStatus() - .isOk(); - - verify(employeeRepository).updateEmployee(employee); - } -} diff --git a/spring-5-reactive-security/src/test/java/com/baeldung/reactive/webflux/EmployeeControllerIntegrationTest.java b/spring-5-reactive-security/src/test/java/com/baeldung/reactive/webflux/EmployeeControllerIntegrationTest.java deleted file mode 100644 index 12a21ed4ef..0000000000 --- a/spring-5-reactive-security/src/test/java/com/baeldung/reactive/webflux/EmployeeControllerIntegrationTest.java +++ /dev/null @@ -1,76 +0,0 @@ -package com.baeldung.reactive.webflux; - -import static org.mockito.BDDMockito.given; - -import java.util.ArrayList; -import java.util.List; - -import org.junit.FixMethodOrder; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.MethodSorters; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.mock.mockito.MockBean; -import org.springframework.test.context.junit4.SpringRunner; -import org.springframework.test.web.reactive.server.WebTestClient; - -import com.baeldung.webflux.EmployeeSpringApplication; -import com.baeldung.webflux.Employee; -import com.baeldung.webflux.EmployeeRepository; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -@RunWith(SpringRunner.class) -@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes=EmployeeSpringApplication.class) -@FixMethodOrder(MethodSorters.NAME_ASCENDING) -public class EmployeeControllerIntegrationTest { - - @Autowired - private WebTestClient testClient; - - @MockBean - private EmployeeRepository employeeRepository; - - @Test - public void givenEmployeeId_whenGetEmployeeById_thenCorrectEmployee() { - - Employee employee = new Employee("1", "Employee 1 Name"); - - given(employeeRepository.findEmployeeById("1")).willReturn(Mono.just(employee)); - testClient.get() - .uri("/employees/1") - .exchange() - .expectStatus() - .isOk() - .expectBody(Employee.class) - .isEqualTo(employee); - } - - @Test - public void whenGetAllEmployees_thenCorrectEmployees() { - - List employeeList = new ArrayList<>(); - - Employee employee1 = new Employee("1", "Employee 1 Name"); - Employee employee2 = new Employee("2", "Employee 2 Name"); - Employee employee3 = new Employee("3", "Employee 3 Name"); - - employeeList.add(employee1); - employeeList.add(employee2); - employeeList.add(employee3); - - Flux employeeFlux = Flux.fromIterable(employeeList); - - given(employeeRepository.findAllEmployees()).willReturn(employeeFlux); - testClient.get() - .uri("/employees") - .exchange() - .expectStatus() - .isOk() - .expectBodyList(Employee.class) - .hasSize(3) - .isEqualTo(employeeList); - } -} diff --git a/spring-5-reactive-security/src/test/java/com/baeldung/security/SecurityIntegrationTest.java b/spring-5-reactive-security/src/test/java/com/baeldung/security/SecurityIntegrationTest.java deleted file mode 100644 index 423500e09c..0000000000 --- a/spring-5-reactive-security/src/test/java/com/baeldung/security/SecurityIntegrationTest.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.baeldung.security; - -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationContext; -import org.springframework.security.test.context.support.WithMockUser; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringRunner; -import org.springframework.test.web.reactive.server.WebTestClient; - -import com.baeldung.reactive.security.SpringSecurity5Application; - -@RunWith(SpringRunner.class) -@ContextConfiguration(classes = SpringSecurity5Application.class) -public class SecurityIntegrationTest { - - @Autowired - ApplicationContext context; - - private WebTestClient rest; - - @Before - public void setup() { - this.rest = WebTestClient.bindToApplicationContext(this.context).configureClient().build(); - } - - @Test - public void whenNoCredentials_thenRedirectToLogin() { - this.rest.get().uri("/").exchange().expectStatus().is3xxRedirection(); - } - - @Test - @Ignore - @WithMockUser - public void whenHasCredentials_thenSeesGreeting() { - this.rest.get().uri("/").exchange().expectStatus().isOk().expectBody(String.class).isEqualTo("Hello, user"); - } -} diff --git a/spring-5-reactive/README.md b/spring-5-reactive/README.md index 1945b7ea33..0f4756c53e 100644 --- a/spring-5-reactive/README.md +++ b/spring-5-reactive/README.md @@ -7,11 +7,9 @@ The "REST With Spring" Classes: https://bit.ly/restwithspring ### Relevant Articles -- [Spring 5 WebClient](https://www.baeldung.com/spring-5-webclient) - [Exploring the Spring 5 WebFlux URL Matching](https://www.baeldung.com/spring-5-mvc-url-matching) - [Reactive WebSockets with Spring 5](https://www.baeldung.com/spring-5-reactive-websockets) - [Spring Webflux Filters](https://www.baeldung.com/spring-webflux-filters) - [How to Set a Header on a Response with Spring 5](https://www.baeldung.com/spring-response-header) -- [Handling Errors in Spring WebFlux](https://www.baeldung.com/spring-webflux-errors) - [A Guide to Spring Session Reactive Support: WebSession](https://www.baeldung.com/spring-session-reactive) - More articles: [[next -->]](/spring-5-reactive-2) diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorAttributes.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorAttributes.java deleted file mode 100644 index 5885ac50d0..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorAttributes.java +++ /dev/null @@ -1,53 +0,0 @@ - -package com.baeldung.reactive.errorhandling; - -import java.util.Map; - -import org.springframework.boot.web.error.ErrorAttributeOptions; -import org.springframework.boot.web.reactive.error.DefaultErrorAttributes; -import org.springframework.http.HttpStatus; -import org.springframework.stereotype.Component; -import org.springframework.web.reactive.function.server.ServerRequest; - -@Component -public class GlobalErrorAttributes extends DefaultErrorAttributes{ - - private HttpStatus status = HttpStatus.BAD_REQUEST; - private String message = "please provide a name"; - - @Override - public Map getErrorAttributes(ServerRequest request, ErrorAttributeOptions options) { - Map map = super.getErrorAttributes(request, options); - map.put("status", getStatus()); - map.put("message", getMessage()); - return map; - } - - /** - * @return the status - */ - public HttpStatus getStatus() { - return status; - } - - /** - * @param status the status to set - */ - public void setStatus(HttpStatus status) { - this.status = status; - } - - /** - * @return the message - */ - public String getMessage() { - return message; - } - - /** - * @param message the message to set - */ - public void setMessage(String message) { - this.message = message; - } -} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorWebExceptionHandler.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorWebExceptionHandler.java deleted file mode 100644 index 4f3f1795da..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorWebExceptionHandler.java +++ /dev/null @@ -1,50 +0,0 @@ - -package com.baeldung.reactive.errorhandling; - -import java.util.Map; - -import org.springframework.boot.autoconfigure.web.WebProperties; -import org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler; -import org.springframework.boot.web.error.ErrorAttributeOptions; -import org.springframework.boot.web.reactive.error.ErrorAttributes; -import org.springframework.context.ApplicationContext; -import org.springframework.core.annotation.Order; -import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; -import org.springframework.http.codec.ServerCodecConfigurer; -import org.springframework.stereotype.Component; -import org.springframework.web.reactive.function.BodyInserters; -import org.springframework.web.reactive.function.server.RequestPredicates; -import org.springframework.web.reactive.function.server.RouterFunction; -import org.springframework.web.reactive.function.server.RouterFunctions; -import org.springframework.web.reactive.function.server.ServerRequest; -import org.springframework.web.reactive.function.server.ServerResponse; - -import reactor.core.publisher.Mono; - -@Component -@Order(-2) -public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHandler { - - public GlobalErrorWebExceptionHandler(GlobalErrorAttributes g, ApplicationContext applicationContext, - ServerCodecConfigurer serverCodecConfigurer) { - super(g, new WebProperties.Resources(), applicationContext); - super.setMessageWriters(serverCodecConfigurer.getWriters()); - super.setMessageReaders(serverCodecConfigurer.getReaders()); - } - - @Override - protected RouterFunction getRoutingFunction(final ErrorAttributes errorAttributes) { - return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse); - } - - private Mono renderErrorResponse(final ServerRequest request) { - - final Map errorPropertiesMap = getErrorAttributes(request, ErrorAttributeOptions.defaults()); - - return ServerResponse.status(HttpStatus.BAD_REQUEST) - .contentType(MediaType.APPLICATION_JSON) - .body(BodyInserters.fromValue(errorPropertiesMap)); - } - -} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/NameRequiredException.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/NameRequiredException.java deleted file mode 100644 index 38d35544a7..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/NameRequiredException.java +++ /dev/null @@ -1,12 +0,0 @@ - -package com.baeldung.reactive.errorhandling; - -import org.springframework.http.HttpStatus; -import org.springframework.web.server.ResponseStatusException; - -public class NameRequiredException extends ResponseStatusException { - - public NameRequiredException(HttpStatus status, String message, Throwable e) { - super(status, message, e); - } -} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler1.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler1.java deleted file mode 100644 index c71c8ecac0..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler1.java +++ /dev/null @@ -1,29 +0,0 @@ - -package com.baeldung.reactive.errorhandling.handlers; - -import org.springframework.http.MediaType; -import org.springframework.stereotype.Component; -import org.springframework.web.reactive.function.server.ServerRequest; -import org.springframework.web.reactive.function.server.ServerResponse; -import reactor.core.publisher.Mono; - -@Component -public class Handler1 { - - public Mono handleRequest1(ServerRequest request) { - return sayHello(request).onErrorReturn("Hello, Stranger") - .flatMap(s -> ServerResponse.ok() - .contentType(MediaType.TEXT_PLAIN) - .bodyValue(s)); - } - - private Mono sayHello(ServerRequest request) { - try { - return Mono.just("Hello, " + request.queryParam("name") - .get()); - } catch (Exception e) { - return Mono.error(e); - } - } - -} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler2.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler2.java deleted file mode 100644 index 92e881543e..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler2.java +++ /dev/null @@ -1,38 +0,0 @@ - -package com.baeldung.reactive.errorhandling.handlers; - -import org.springframework.http.MediaType; -import org.springframework.stereotype.Component; -import org.springframework.web.reactive.function.server.ServerRequest; -import org.springframework.web.reactive.function.server.ServerResponse; -import reactor.core.publisher.Mono; - -@Component -public class Handler2 { - -public Mono handleRequest2(ServerRequest request) { - return - sayHello(request) - .flatMap(s -> ServerResponse.ok() - .contentType(MediaType.TEXT_PLAIN) - .bodyValue(s)) - .onErrorResume(e -> sayHelloFallback() - .flatMap(s -> ServerResponse.ok() - .contentType(MediaType.TEXT_PLAIN) - .bodyValue(s))); -} - - private Mono sayHello(ServerRequest request) { - try { - return Mono.just("Hello, " + request.queryParam("name") - .get()); - } catch (Exception e) { - return Mono.error(e); - } - } - - private Mono sayHelloFallback() { - return Mono.just("Hello, Stranger"); - } - -} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler3.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler3.java deleted file mode 100644 index 8c988a6633..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler3.java +++ /dev/null @@ -1,34 +0,0 @@ - -package com.baeldung.reactive.errorhandling.handlers; - -import org.springframework.http.MediaType; -import org.springframework.stereotype.Component; -import org.springframework.web.reactive.function.server.ServerRequest; -import org.springframework.web.reactive.function.server.ServerResponse; -import reactor.core.publisher.Mono; - -@Component -public class Handler3 { - - public Mono handleRequest3(ServerRequest request) { - return - sayHello(request) - .flatMap(s -> ServerResponse.ok() - .contentType(MediaType.TEXT_PLAIN) - .bodyValue(s)) - .onErrorResume(e -> (Mono.just("Hi, I looked around for your name but found: " + - e.getMessage())).flatMap(s -> ServerResponse.ok() - .contentType(MediaType.TEXT_PLAIN) - .bodyValue(s))); - } - - private Mono sayHello(ServerRequest request) { - try { - return Mono.just("Hello, " + request.queryParam("name") - .get()); - } catch (Exception e) { - return Mono.error(e); - } - } - -} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler4.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler4.java deleted file mode 100644 index 3d6ef258d3..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler4.java +++ /dev/null @@ -1,30 +0,0 @@ - -package com.baeldung.reactive.errorhandling.handlers; - -import com.baeldung.reactive.errorhandling.NameRequiredException; -import org.springframework.http.HttpStatus; -import org.springframework.stereotype.Component; -import org.springframework.web.reactive.function.server.ServerRequest; -import org.springframework.web.reactive.function.server.ServerResponse; -import reactor.core.publisher.Mono; - -@Component -public class Handler4 { - -public Mono handleRequest4(ServerRequest request) { - return ServerResponse.ok() - .body(sayHello(request) - .onErrorResume(e -> - Mono.error(new NameRequiredException( - HttpStatus.BAD_REQUEST, "please provide a name", e))), String.class); -} - - private Mono sayHello(ServerRequest request) { - try { - return Mono.just("Hello, " + request.queryParam("name").get()); - } catch (Exception e) { - return Mono.error(e); - } - } - -} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler5.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler5.java deleted file mode 100644 index 41605b355b..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler5.java +++ /dev/null @@ -1,22 +0,0 @@ - -package com.baeldung.reactive.errorhandling.handlers; - -import org.springframework.stereotype.Component; -import org.springframework.web.reactive.function.server.ServerRequest; -import org.springframework.web.reactive.function.server.ServerResponse; -import reactor.core.publisher.Mono; - -@Component -public class Handler5 { - - public Mono handleRequest5(ServerRequest request) { - return ServerResponse.ok() - .body(sayHello(request), String.class); - - } - - private Mono sayHello(ServerRequest request) { - return Mono.just("Hello, " + request.queryParam("name").get()); - } - -} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router1.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router1.java deleted file mode 100644 index 91be24571c..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router1.java +++ /dev/null @@ -1,22 +0,0 @@ - -package com.baeldung.reactive.errorhandling.routers; - -import com.baeldung.reactive.errorhandling.handlers.Handler1; -import org.springframework.context.annotation.Bean; -import org.springframework.http.MediaType; -import org.springframework.stereotype.Component; -import org.springframework.web.reactive.function.server.RequestPredicates; -import org.springframework.web.reactive.function.server.RouterFunction; -import org.springframework.web.reactive.function.server.RouterFunctions; -import org.springframework.web.reactive.function.server.ServerResponse; - -@Component -public class Router1 { - - @Bean - public RouterFunction routeRequest1(Handler1 handler) { - return RouterFunctions.route(RequestPredicates.GET("/api/endpoint1") - .and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), handler::handleRequest1); - } - -} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router2.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router2.java deleted file mode 100644 index bc7831f494..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router2.java +++ /dev/null @@ -1,22 +0,0 @@ - -package com.baeldung.reactive.errorhandling.routers; - -import com.baeldung.reactive.errorhandling.handlers.Handler2; -import org.springframework.context.annotation.Bean; -import org.springframework.http.MediaType; -import org.springframework.stereotype.Component; -import org.springframework.web.reactive.function.server.RequestPredicates; -import org.springframework.web.reactive.function.server.RouterFunction; -import org.springframework.web.reactive.function.server.RouterFunctions; -import org.springframework.web.reactive.function.server.ServerResponse; - -@Component -public class Router2 { - - @Bean - public RouterFunction routeRequest2(Handler2 handler) { - return RouterFunctions.route(RequestPredicates.GET("/api/endpoint2") - .and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), handler::handleRequest2); - } - -} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router3.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router3.java deleted file mode 100644 index 461e6fe9e7..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router3.java +++ /dev/null @@ -1,22 +0,0 @@ - -package com.baeldung.reactive.errorhandling.routers; - -import com.baeldung.reactive.errorhandling.handlers.Handler3; -import org.springframework.context.annotation.Bean; -import org.springframework.http.MediaType; -import org.springframework.stereotype.Component; -import org.springframework.web.reactive.function.server.RequestPredicates; -import org.springframework.web.reactive.function.server.RouterFunction; -import org.springframework.web.reactive.function.server.RouterFunctions; -import org.springframework.web.reactive.function.server.ServerResponse; - -@Component -public class Router3 { - - @Bean - public RouterFunction routeRequest3(Handler3 handler) { - return RouterFunctions.route(RequestPredicates.GET("/api/endpoint3") - .and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), handler::handleRequest3); - } - -} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router4.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router4.java deleted file mode 100644 index 9dccc6858f..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router4.java +++ /dev/null @@ -1,22 +0,0 @@ - -package com.baeldung.reactive.errorhandling.routers; - -import com.baeldung.reactive.errorhandling.handlers.Handler4; -import org.springframework.context.annotation.Bean; -import org.springframework.http.MediaType; -import org.springframework.stereotype.Component; -import org.springframework.web.reactive.function.server.RequestPredicates; -import org.springframework.web.reactive.function.server.RouterFunction; -import org.springframework.web.reactive.function.server.RouterFunctions; -import org.springframework.web.reactive.function.server.ServerResponse; - -@Component -public class Router4 { - - @Bean - public RouterFunction routeRequest4(Handler4 handler) { - return RouterFunctions.route(RequestPredicates.GET("/api/endpoint4") - .and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), handler::handleRequest4); - } - -} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router5.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router5.java deleted file mode 100644 index 59fd587fc8..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router5.java +++ /dev/null @@ -1,22 +0,0 @@ - -package com.baeldung.reactive.errorhandling.routers; - -import com.baeldung.reactive.errorhandling.handlers.Handler5; -import org.springframework.context.annotation.Bean; -import org.springframework.http.MediaType; -import org.springframework.stereotype.Component; -import org.springframework.web.reactive.function.server.RequestPredicates; -import org.springframework.web.reactive.function.server.RouterFunction; -import org.springframework.web.reactive.function.server.RouterFunctions; -import org.springframework.web.reactive.function.server.ServerResponse; - -@Component -public class Router5 { - - @Bean - public RouterFunction routeRequest5(Handler5 handler) { - return RouterFunctions.route(RequestPredicates.GET("/api/endpoint5") - .and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), handler::handleRequest5); - } - -} diff --git a/spring-5-reactive/src/main/java/com/baeldung/web/reactive/Task.java b/spring-5-reactive/src/main/java/com/baeldung/web/reactive/Task.java deleted file mode 100644 index 725fd931e1..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/web/reactive/Task.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.baeldung.web.reactive; - -import com.fasterxml.jackson.annotation.JsonProperty; - -public class Task { - - private final String name; - - private final int id; - - public Task(@JsonProperty("name") String name, @JsonProperty("id") int id) { - this.name = name; - this.id = id; - } - - public String getName() { - return this.name; - } - - public int getId() { - return this.id; - } - - @Override - public String toString() { - return "Task{" + "name='" + name + '\'' + ", id=" + id + '}'; - } -} diff --git a/spring-5-reactive/src/main/java/com/baeldung/web/reactive/client/Foo.java b/spring-5-reactive/src/main/java/com/baeldung/web/reactive/client/Foo.java deleted file mode 100644 index c6e3678832..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/web/reactive/client/Foo.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.baeldung.web.reactive.client; - -public class Foo { - - private String name; - - public Foo() { - super(); - } - - public Foo(String name) { - super(); - this.name = name; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - -} diff --git a/spring-5-reactive/src/main/java/com/baeldung/web/reactive/client/WebClientApplication.java b/spring-5-reactive/src/main/java/com/baeldung/web/reactive/client/WebClientApplication.java deleted file mode 100644 index aa9b81de4f..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/web/reactive/client/WebClientApplication.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.baeldung.web.reactive.client; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.autoconfigure.security.reactive.ReactiveSecurityAutoConfiguration; - -@SpringBootApplication(exclude = { ReactiveSecurityAutoConfiguration.class }) -public class WebClientApplication { - - public static void main(String[] args) { - SpringApplication.run(WebClientApplication.class, args); - } - -} diff --git a/spring-5-reactive/src/main/java/com/baeldung/web/reactive/client/WebClientController.java b/spring-5-reactive/src/main/java/com/baeldung/web/reactive/client/WebClientController.java deleted file mode 100644 index 1a91001807..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/web/reactive/client/WebClientController.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.baeldung.web.reactive.client; - -import java.util.HashMap; -import java.util.Map; - -import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestPart; -import org.springframework.web.bind.annotation.ResponseStatus; -import org.springframework.web.bind.annotation.RestController; - -import reactor.core.publisher.Mono; - -@RestController -public class WebClientController { - - @ResponseStatus(HttpStatus.OK) - @GetMapping("/resource") - public Map getResource() { - Map response = new HashMap<>(); - response.put("field", "value"); - return response; - } - - @PostMapping("/resource") - public Mono postStringResource(@RequestBody Mono bodyString) { - return bodyString.map(body -> "processed-" + body); - } - - @PostMapping("/resource-foo") - public Mono postFooResource(@RequestBody Mono bodyFoo) { - return bodyFoo.map(foo -> "processedFoo-" + foo.getName()); - } - - @PostMapping(value = "/resource-multipart", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) - public String handleFormUpload(@RequestPart("key1") String value1, @RequestPart("key2") String value2) { - return "processed-" + value1 + "-" + value2; - } -} diff --git a/spring-5-reactive/src/test/java/com/baeldung/web/client/SpringContextTest.java b/spring-5-reactive/src/test/java/com/baeldung/web/client/SpringContextTest.java deleted file mode 100644 index 8d2ca41451..0000000000 --- a/spring-5-reactive/src/test/java/com/baeldung/web/client/SpringContextTest.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.baeldung.web.client; - -import org.junit.jupiter.api.Test; -import org.springframework.boot.test.context.SpringBootTest; - -import com.baeldung.web.reactive.client.WebClientApplication; - -@SpringBootTest(classes = WebClientApplication.class) -public class SpringContextTest { - - @Test - public void whenSpringContextIsBootstrapped_thenNoExceptions() { - } -} diff --git a/spring-5-reactive/src/test/java/com/baeldung/web/client/WebClientIntegrationTest.java b/spring-5-reactive/src/test/java/com/baeldung/web/client/WebClientIntegrationTest.java deleted file mode 100644 index 7e1fc86847..0000000000 --- a/spring-5-reactive/src/test/java/com/baeldung/web/client/WebClientIntegrationTest.java +++ /dev/null @@ -1,331 +0,0 @@ -package com.baeldung.web.client; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.time.ZonedDateTime; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.junit.jupiter.api.Test; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; -import org.springframework.boot.web.server.LocalServerPort; -import org.springframework.core.ParameterizedTypeReference; -import org.springframework.core.codec.CodecException; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; -import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; -import org.springframework.http.ReactiveHttpOutputMessage; -import org.springframework.http.client.reactive.ClientHttpRequest; -import org.springframework.http.client.reactive.ReactorClientHttpConnector; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; -import org.springframework.web.reactive.function.BodyInserter; -import org.springframework.web.reactive.function.BodyInserters; -import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec; -import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec; -import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec; -import org.springframework.web.reactive.function.client.WebClient.RequestHeadersUriSpec; -import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; -import org.springframework.web.reactive.function.client.WebClientRequestException; - -import com.baeldung.web.reactive.client.Foo; -import com.baeldung.web.reactive.client.WebClientApplication; - -import io.netty.channel.ChannelOption; -import io.netty.handler.timeout.ReadTimeoutException; -import io.netty.handler.timeout.ReadTimeoutHandler; -import io.netty.handler.timeout.WriteTimeoutHandler; -import reactor.core.publisher.Mono; -import reactor.netty.http.client.HttpClient; -import reactor.test.StepVerifier; - -@SpringBootTest(classes = WebClientApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT) -public class WebClientIntegrationTest { - - @LocalServerPort - private int port; - - private static final String BODY_VALUE = "bodyValue"; - private static final ParameterizedTypeReference> MAP_RESPONSE_REF = new ParameterizedTypeReference>() { - }; - - @Test - public void givenDifferentWebClientCreationMethods_whenUsed_thenObtainExpectedResponse() { - // WebClient creation - WebClient client1 = WebClient.create(); - WebClient client2 = WebClient.create("http://localhost:" + port); - WebClient client3 = WebClient.builder() - .baseUrl("http://localhost:" + port) - .defaultCookie("cookieKey", "cookieValue") - .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .defaultUriVariables(Collections.singletonMap("url", "http://localhost:8080")) - .build(); - - // response assertions - StepVerifier.create(retrieveResponse(client1.post() - .uri("http://localhost:" + port + "/resource"))) - .expectNext("processed-bodyValue") - .verifyComplete(); - StepVerifier.create(retrieveResponse(client2)) - .expectNext("processed-bodyValue") - .verifyComplete(); - StepVerifier.create(retrieveResponse(client3)) - .expectNext("processed-bodyValue") - .verifyComplete(); - // assert response without specifying URI - StepVerifier.create(retrieveResponse(client1)) - .expectErrorMatches(ex -> WebClientRequestException.class.isAssignableFrom(ex.getClass()) && ex.getMessage() - .contains("Connection refused")) - .verify(); - } - - @Test - public void givenDifferentMethodSpecifications_whenUsed_thenObtainExpectedResponse() { - // request specification - RequestBodyUriSpec uriSpecPost1 = createDefaultClient().method(HttpMethod.POST); - RequestBodyUriSpec uriSpecPost2 = createDefaultClient().post(); - RequestHeadersUriSpec requestGet = createDefaultClient().get(); - - // response assertions - StepVerifier.create(retrieveResponse(uriSpecPost1)) - .expectNext("processed-bodyValue") - .verifyComplete(); - StepVerifier.create(retrieveResponse(uriSpecPost2)) - .expectNext("processed-bodyValue") - .verifyComplete(); - StepVerifier.create(retrieveGetResponse(requestGet)) - .expectNextMatches(nextMap -> nextMap.get("field") - .equals("value")) - .verifyComplete(); - } - - @Test - public void givenDifferentUriSpecifications_whenUsed_thenObtainExpectedResponse() { - // uri specification - RequestBodySpec bodySpecUsingString = createDefaultPostRequest().uri("/resource"); - RequestBodySpec bodySpecUsingUriBuilder = createDefaultPostRequest().uri(uriBuilder -> uriBuilder.pathSegment("resource") - .build()); - RequestBodySpec bodySpecusingURI = createDefaultPostRequest().uri(URI.create("http://localhost:" + port + "/resource")); - RequestBodySpec bodySpecOverridenBaseUri = createDefaultPostRequest().uri(URI.create("/resource")); - RequestBodySpec bodySpecOverridenBaseUri2 = WebClient.builder() - .baseUrl("http://localhost:" + port) - .build() - .post() - .uri(URI.create("/resource")); - - // response assertions - StepVerifier.create(retrieveResponse(bodySpecUsingString)) - .expectNext("processed-bodyValue") - .verifyComplete(); - StepVerifier.create(retrieveResponse(bodySpecUsingUriBuilder)) - .expectNext("processed-bodyValue") - .verifyComplete(); - StepVerifier.create(retrieveResponse(bodySpecusingURI)) - .expectNext("processed-bodyValue") - .verifyComplete(); - // assert sending request overriding base URI - StepVerifier.create(retrieveResponse(bodySpecOverridenBaseUri)) - .expectErrorMatches(ex -> WebClientRequestException.class.isAssignableFrom(ex.getClass()) && ex.getMessage() - .contains("Connection refused")) - .verify(); - StepVerifier.create(retrieveResponse(bodySpecOverridenBaseUri2)) - .expectErrorMatches(ex -> WebClientRequestException.class.isAssignableFrom(ex.getClass()) && ex.getMessage() - .contains("Connection refused")) - .verify(); - } - - @Test - public void givenDifferentBodySpecifications_whenUsed_thenObtainExpectedResponse() { - // request body specifications - RequestHeadersSpec headersSpecPost1 = createDefaultPostResourceRequest().body(BodyInserters.fromPublisher(Mono.just(BODY_VALUE), String.class)); - RequestHeadersSpec headersSpecPost2 = createDefaultPostResourceRequest().body(BodyInserters.fromValue(BODY_VALUE)); - RequestHeadersSpec headersSpecPost3 = createDefaultPostResourceRequest().bodyValue(BODY_VALUE); - RequestHeadersSpec headersSpecFooPost = createDefaultPostRequest().uri("/resource-foo") - .body(Mono.just(new Foo("fooName")), Foo.class); - BodyInserter inserterPlainObject = BodyInserters.fromValue(new Object()); - RequestHeadersSpec headersSpecPlainObject = createDefaultPostResourceRequest().body(inserterPlainObject); - - // request body specifications - using other inserter method (multipart request) - LinkedMultiValueMap map = new LinkedMultiValueMap<>(); - map.add("key1", "multipartValue1"); - map.add("key2", "multipartValue2"); - BodyInserter, ClientHttpRequest> inserterMultipart = BodyInserters.fromMultipartData(map); - RequestHeadersSpec headersSpecInserterMultipart = createDefaultPostRequest().uri("/resource-multipart") - .body(inserterMultipart); - - // response assertions - StepVerifier.create(retrieveResponse(headersSpecPost1)) - .expectNext("processed-bodyValue") - .verifyComplete(); - StepVerifier.create(retrieveResponse(headersSpecPost2)) - .expectNext("processed-bodyValue") - .verifyComplete(); - StepVerifier.create(retrieveResponse(headersSpecPost3)) - .expectNext("processed-bodyValue") - .verifyComplete(); - StepVerifier.create(retrieveResponse(headersSpecFooPost)) - .expectNext("processedFoo-fooName") - .verifyComplete(); - StepVerifier.create(retrieveResponse(headersSpecInserterMultipart)) - .expectNext("processed-multipartValue1-multipartValue2") - .verifyComplete(); - // assert error plain `new Object()` as request body - StepVerifier.create(retrieveResponse(headersSpecPlainObject)) - .expectError(CodecException.class) - .verify(); - // assert response for request with no body - Mono> responsePostWithNoBody = createDefaultPostResourceRequest().exchangeToMono(responseHandler -> { - assertThat(responseHandler.statusCode()).isEqualTo(HttpStatus.BAD_REQUEST); - return responseHandler.bodyToMono(MAP_RESPONSE_REF); - }); - StepVerifier.create(responsePostWithNoBody) - .expectNextMatches(nextMap -> nextMap.get("error") - .equals("Bad Request")) - .verifyComplete(); - } - - @Test - public void givenPostSpecifications_whenHeadersAdded_thenObtainExpectedResponse() { - // request header specification - RequestHeadersSpec headersSpecInserterStringWithHeaders = createDefaultPostResourceRequestResponse().header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) - .accept(MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML) - .acceptCharset(StandardCharsets.UTF_8) - .ifNoneMatch("*") - .ifModifiedSince(ZonedDateTime.now()); - - // response assertions - StepVerifier.create(retrieveResponse(headersSpecInserterStringWithHeaders)) - .expectNext("processed-bodyValue") - .verifyComplete(); - } - - @Test - public void givenDifferentResponseSpecifications_whenUsed_thenObtainExpectedResponse() { - ResponseSpec responseSpecPostString = createDefaultPostResourceRequestResponse().retrieve(); - Mono responsePostString = responseSpecPostString.bodyToMono(String.class); - Mono responsePostString2 = createDefaultPostResourceRequestResponse().exchangeToMono(response -> { - if (response.statusCode() - .equals(HttpStatus.OK)) { - return response.bodyToMono(String.class); - } else if (response.statusCode() - .is4xxClientError()) { - return Mono.just("Error response"); - } else { - return response.createException() - .flatMap(Mono::error); - } - }); - Mono responsePostNoBody = createDefaultPostResourceRequest().exchangeToMono(response -> { - if (response.statusCode() - .equals(HttpStatus.OK)) { - return response.bodyToMono(String.class); - } else if (response.statusCode() - .is4xxClientError()) { - return Mono.just("Error response"); - } else { - return response.createException() - .flatMap(Mono::error); - } - }); - Mono> responseGet = createDefaultClient().get() - .uri("/resource") - .retrieve() - .bodyToMono(MAP_RESPONSE_REF); - - // response assertions - StepVerifier.create(responsePostString) - .expectNext("processed-bodyValue") - .verifyComplete(); - StepVerifier.create(responsePostString2) - .expectNext("processed-bodyValue") - .verifyComplete(); - StepVerifier.create(responsePostNoBody) - .expectNext("Error response") - .verifyComplete(); - StepVerifier.create(responseGet) - .expectNextMatches(nextMap -> nextMap.get("field") - .equals("value")) - .verifyComplete(); - } - - @Test - public void givenWebClientWithTimeoutConfigurations_whenRequestUsingWronglyConfiguredPublisher_thenObtainTimeout() { - HttpClient httpClient = HttpClient.create() - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000) - .responseTimeout(Duration.ofMillis(1000)) - .doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(1000, TimeUnit.MILLISECONDS)) - .addHandlerLast(new WriteTimeoutHandler(1000, TimeUnit.MILLISECONDS))); - - WebClient timeoutClient = WebClient.builder() - .baseUrl("http://localhost:" + port) - .clientConnector(new ReactorClientHttpConnector(httpClient)) - .build(); - - RequestHeadersSpec neverendingMonoBodyRequest = timeoutClient.post() - .uri("/resource") - .body(Mono.never(), String.class); - - StepVerifier.create(neverendingMonoBodyRequest.retrieve() - .bodyToMono(String.class)) - .expectErrorMatches(ex -> WebClientRequestException.class.isAssignableFrom(ex.getClass()) && ReadTimeoutException.class.isAssignableFrom(ex.getCause() - .getClass())) - .verify(); - } - - // helper methods to create default instances - private WebClient createDefaultClient() { - return WebClient.create("http://localhost:" + port); - } - - private RequestBodyUriSpec createDefaultPostRequest() { - return createDefaultClient().post(); - } - - private RequestBodySpec createDefaultPostResourceRequest() { - return createDefaultPostRequest().uri("/resource"); - } - - private RequestHeadersSpec createDefaultPostResourceRequestResponse() { - return createDefaultPostResourceRequest().bodyValue(BODY_VALUE); - } - - // helper methods to retrieve a response based on different steps of the process (specs) - private Mono retrieveResponse(WebClient client) { - return client.post() - .uri("/resource") - .bodyValue(BODY_VALUE) - .retrieve() - .bodyToMono(String.class); - } - - private Mono retrieveResponse(RequestBodyUriSpec spec) { - return spec.uri("/resource") - .bodyValue(BODY_VALUE) - .retrieve() - .bodyToMono(String.class); - } - - private Mono> retrieveGetResponse(RequestHeadersUriSpec spec) { - return spec.uri("/resource") - .retrieve() - .bodyToMono(MAP_RESPONSE_REF); - } - - private Mono retrieveResponse(RequestBodySpec spec) { - return spec.bodyValue(BODY_VALUE) - .retrieve() - .bodyToMono(String.class); - } - - private Mono retrieveResponse(RequestHeadersSpec spec) { - return spec.retrieve() - .bodyToMono(String.class); - } -} diff --git a/spring-5-reactive/src/test/java/com/baeldung/web/client/WebTestClientIntegrationTest.java b/spring-5-reactive/src/test/java/com/baeldung/web/client/WebTestClientIntegrationTest.java deleted file mode 100644 index 07a4c81c91..0000000000 --- a/spring-5-reactive/src/test/java/com/baeldung/web/client/WebTestClientIntegrationTest.java +++ /dev/null @@ -1,102 +0,0 @@ -package com.baeldung.web.client; - -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.web.server.LocalServerPort; -import org.springframework.context.ApplicationContext; -import org.springframework.security.test.context.support.WithMockUser; -import org.springframework.test.web.reactive.server.WebTestClient; -import org.springframework.web.reactive.function.server.RequestPredicates; -import org.springframework.web.reactive.function.server.RouterFunction; -import org.springframework.web.reactive.function.server.RouterFunctions; -import org.springframework.web.reactive.function.server.ServerResponse; -import org.springframework.web.server.WebHandler; - -import com.baeldung.web.reactive.client.WebClientApplication; -import com.baeldung.web.reactive.client.WebClientController; - -import reactor.core.publisher.Mono; - -@SpringBootTest(classes = WebClientApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) -public class WebTestClientIntegrationTest { - - @LocalServerPort - private int port; - - @Autowired - private ApplicationContext context; - - @Autowired - private WebClientController controller; - - private final RouterFunction ROUTER_FUNCTION = RouterFunctions.route(RequestPredicates.GET("/resource"), request -> ServerResponse.ok() - .build()); - private final WebHandler WEB_HANDLER = exchange -> Mono.empty(); - - @Test - public void testWebTestClientWithServerWebHandler() { - WebTestClient.bindToWebHandler(WEB_HANDLER) - .build(); - } - - @Test - public void testWebTestClientWithRouterFunction() { - WebTestClient.bindToRouterFunction(ROUTER_FUNCTION) - .build() - .get() - .uri("/resource") - .exchange() - .expectStatus() - .isOk() - .expectBody() - .isEmpty(); - } - - @Test - @WithMockUser - public void testWebTestClientWithServerURL() { - WebTestClient.bindToServer() - .baseUrl("http://localhost:" + port) - .build() - .get() - .uri("/resource") - .exchange() - .expectStatus() - .isOk() - .expectBody() - .jsonPath("field") - .isEqualTo("value"); - ; - } - - @Test - @WithMockUser - public void testWebTestClientWithApplicationContext() { - WebTestClient.bindToApplicationContext(context) - .build() - .get() - .uri("/resource") - .exchange() - .expectStatus() - .isOk() - .expectBody() - .jsonPath("field") - .isEqualTo("value"); - } - - @Test - public void testWebTestClientWithController() { - WebTestClient.bindToController(controller) - .build() - .get() - .uri("/resource") - .exchange() - .expectStatus() - .isOk() - .expectBody() - .jsonPath("field") - .isEqualTo("value"); - } - -} diff --git a/spring-5-webflux/README.md b/spring-5-webflux/README.md index 889f211fc6..f6467a8993 100644 --- a/spring-5-webflux/README.md +++ b/spring-5-webflux/README.md @@ -6,7 +6,6 @@ This module contains articles about Spring 5 WebFlux - [Spring Boot Reactor Netty Configuration](https://www.baeldung.com/spring-boot-reactor-netty) - [How to Return 404 with Spring WebFlux](https://www.baeldung.com/spring-webflux-404) -- [Spring WebClient Requests with Parameters](https://www.baeldung.com/webflux-webclient-parameters) - [RSocket Using Spring Boot](https://www.baeldung.com/spring-boot-rsocket) - [Spring MVC Async vs Spring WebFlux](https://www.baeldung.com/spring-mvc-async-vs-webflux) - [Set a Timeout in Spring 5 Webflux WebClient](https://www.baeldung.com/spring-webflux-timeout) diff --git a/spring-5-webflux/src/main/java/com/baeldung/spring/webclientrequests/SpringWebClientRequestsApp.java b/spring-5-webflux/src/main/java/com/baeldung/spring/webclientrequests/SpringWebClientRequestsApp.java deleted file mode 100644 index 314fe2fdf9..0000000000 --- a/spring-5-webflux/src/main/java/com/baeldung/spring/webclientrequests/SpringWebClientRequestsApp.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.baeldung.spring.webclientrequests; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - -@SpringBootApplication -public class SpringWebClientRequestsApp { - - public static void main(String[] args) { - SpringApplication.run(SpringWebClientRequestsApp.class, args); - } -} diff --git a/spring-5-webflux/src/test/java/com/baeldung/spring/webclientrequests/WebClientRequestsUnitTest.java b/spring-5-webflux/src/test/java/com/baeldung/spring/webclientrequests/WebClientRequestsUnitTest.java deleted file mode 100644 index 353cb24d0a..0000000000 --- a/spring-5-webflux/src/test/java/com/baeldung/spring/webclientrequests/WebClientRequestsUnitTest.java +++ /dev/null @@ -1,176 +0,0 @@ -package com.baeldung.spring.webclientrequests; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import java.time.Duration; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; -import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest; -import org.springframework.test.context.junit4.SpringRunner; -import org.springframework.web.reactive.function.client.ClientRequest; -import org.springframework.web.reactive.function.client.ClientResponse; -import org.springframework.web.reactive.function.client.ExchangeFunction; -import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.web.util.DefaultUriBuilderFactory; -import reactor.core.publisher.Mono; - -@RunWith(SpringRunner.class) -@WebFluxTest -public class WebClientRequestsUnitTest { - - private static final String BASE_URL = "https://example.com"; - - private WebClient webClient; - - @Captor - private ArgumentCaptor argumentCaptor; - - private ExchangeFunction exchangeFunction; - - @Before - public void init() { - MockitoAnnotations.initMocks(this); - this.exchangeFunction = mock(ExchangeFunction.class); - ClientResponse mockResponse = mock(ClientResponse.class); - when(this.exchangeFunction.exchange(this.argumentCaptor.capture())).thenReturn(Mono.just(mockResponse)); - this.webClient = WebClient - .builder() - .baseUrl(BASE_URL) - .exchangeFunction(exchangeFunction) - .build(); - } - - - @Test - public void whenCallSimpleURI_thenURIMatched() { - this.webClient.get() - .uri("/products") - .exchange() - .block(Duration.ofSeconds(1)); - verifyCalledUrl("/products"); - } - - @Test - public void whenCallSinglePathSegmentUri_thenURIMatched() { - this.webClient.get() - .uri(uriBuilder -> uriBuilder - .path("/products/{id}") - .build(2)) - .exchange() - .block(Duration.ofSeconds(1)); - verifyCalledUrl("/products/2"); - } - - @Test - public void whenCallMultiplePathSegmentsUri_thenURIMatched() { - this.webClient.get() - .uri(uriBuilder -> uriBuilder - .path("/products/{id}/attributes/{attributeId}") - .build(2, 13)) - .exchange() - .block(Duration.ofSeconds(1)); - verifyCalledUrl("/products/2/attributes/13"); - } - - @Test - public void whenCallSingleQueryParams_thenURIMatched() { - this.webClient.get() - .uri(uriBuilder -> uriBuilder - .path("/products/") - .queryParam("name", "AndroidPhone") - .queryParam("color", "black") - .queryParam("deliveryDate", "13/04/2019") - .build()) - .exchange() - .block(Duration.ofSeconds(1)); - verifyCalledUrl("/products/?name=AndroidPhone&color=black&deliveryDate=13/04/2019"); - } - - @Test - public void whenCallSingleQueryParamsPlaceholders_thenURIMatched() { - this.webClient.get() - .uri(uriBuilder -> uriBuilder - .path("/products/") - .queryParam("name", "{title}") - .queryParam("color", "{authorId}") - .queryParam("deliveryDate", "{date}") - .build("AndroidPhone", "black", "13/04/2019")) - .exchange() - .block(Duration.ofSeconds(1)); - verifyCalledUrl("/products/?name=AndroidPhone&color=black&deliveryDate=13%2F04%2F2019"); - } - - @Test - public void whenCallArrayQueryParamsBrackets_thenURIMatched() { - this.webClient.get() - .uri(uriBuilder -> uriBuilder - .path("/products/") - .queryParam("tag[]", "Snapdragon", "NFC") - .build()) - .exchange() - .block(Duration.ofSeconds(1)); - verifyCalledUrl("/products/?tag%5B%5D=Snapdragon&tag%5B%5D=NFC"); - } - - - @Test - public void whenCallArrayQueryParams_thenURIMatched() { - this.webClient.get() - .uri(uriBuilder -> uriBuilder - .path("/products/") - .queryParam("category", "Phones", "Tablets") - .build()) - .exchange() - .block(Duration.ofSeconds(1)); - verifyCalledUrl("/products/?category=Phones&category=Tablets"); - } - - @Test - public void whenCallArrayQueryParamsComma_thenURIMatched() { - this.webClient.get() - .uri(uriBuilder -> uriBuilder - .path("/products/") - .queryParam("category", String.join(",", "Phones", "Tablets")) - .build()) - .exchange() - .block(Duration.ofSeconds(1)); - verifyCalledUrl("/products/?category=Phones,Tablets"); - } - - @Test - public void whenUriComponentEncoding_thenQueryParamsNotEscaped() { - DefaultUriBuilderFactory factory = new DefaultUriBuilderFactory(BASE_URL); - factory.setEncodingMode(DefaultUriBuilderFactory.EncodingMode.URI_COMPONENT); - this.webClient = WebClient - .builder() - .uriBuilderFactory(factory) - .baseUrl(BASE_URL) - .exchangeFunction(exchangeFunction) - .build(); - this.webClient.get() - .uri(uriBuilder -> uriBuilder - .path("/products/") - .queryParam("name", "AndroidPhone") - .queryParam("color", "black") - .queryParam("deliveryDate", "13/04/2019") - .build()) - .exchange() - .block(Duration.ofSeconds(1)); - verifyCalledUrl("/products/?name=AndroidPhone&color=black&deliveryDate=13/04/2019"); - } - - private void verifyCalledUrl(String relativeUrl) { - ClientRequest request = this.argumentCaptor.getValue(); - Assert.assertEquals(String.format("%s%s", BASE_URL, relativeUrl), request.url().toString()); - Mockito.verify(this.exchangeFunction).exchange(request); - verifyNoMoreInteractions(this.exchangeFunction); - } -} diff --git a/spring-reactive/README.md b/spring-reactive/README.md index 6e887fe2f8..9f1852d912 100644 --- a/spring-reactive/README.md +++ b/spring-reactive/README.md @@ -1,17 +1,16 @@ ## Spring Reactive -This module contains articles about Spring Reactor. +This module contains articles describing reactive processing in Spring. ## Relevant articles: -- [Introduction to Project Reactor Bus](https://www.baeldung.com/reactor-bus) - [Intro To Reactor Core](https://www.baeldung.com/reactor-core) - [Debugging Reactive Streams in Java](https://www.baeldung.com/spring-debugging-reactive-streams) - [Guide to Spring 5 WebFlux](https://www.baeldung.com/spring-webflux) - [Introduction to the Functional Web Framework in Spring 5](https://www.baeldung.com/spring-5-functional-web) - [Spring 5 WebClient](https://www.baeldung.com/spring-5-webclient) - [Spring WebClient vs. RestTemplate](https://www.baeldung.com/spring-webclient-resttemplate) -- [https://www.baeldung.com/webflux-webclient-parameters](https://www.baeldung.com/webflux-webclient-parameters) +- [Spring WebClient Requests with Parameters](https://www.baeldung.com/webflux-webclient-parameters) - [Handling Errors in Spring WebFlux](https://www.baeldung.com/spring-webflux-errors) - [Spring Security 5 for Reactive Applications](https://www.baeldung.com/spring-security-5-reactive) -- [https://www.baeldung.com/spring-webflux-concurrency](https://www.baeldung.com/spring-webflux-concurrency) \ No newline at end of file +- [Concurrency in Spring WebFlux](https://www.baeldung.com/spring-webflux-concurrency) \ No newline at end of file diff --git a/spring-5-reactive/src/test/java/com/baeldung/reactive/errorhandling/ErrorHandlingIntegrationTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/errorhandling/ErrorHandlingIntegrationTest.java similarity index 100% rename from spring-5-reactive/src/test/java/com/baeldung/reactive/errorhandling/ErrorHandlingIntegrationTest.java rename to spring-reactive/src/test/java/com/baeldung/reactive/errorhandling/ErrorHandlingIntegrationTest.java index 38443a4eac..1167792542 100644 --- a/spring-5-reactive/src/test/java/com/baeldung/reactive/errorhandling/ErrorHandlingIntegrationTest.java +++ b/spring-reactive/src/test/java/com/baeldung/reactive/errorhandling/ErrorHandlingIntegrationTest.java @@ -1,9 +1,5 @@ package com.baeldung.reactive.errorhandling; -import static org.junit.Assert.assertEquals; - -import java.io.IOException; - import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; @@ -16,6 +12,10 @@ import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.web.reactive.server.WebTestClient; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) @DirtiesContext diff --git a/spring-webflux-threads/README.md b/spring-webflux-threads/README.md deleted file mode 100644 index 746514feda..0000000000 --- a/spring-webflux-threads/README.md +++ /dev/null @@ -1,3 +0,0 @@ -### Relevant Articles: - -- [Concurrency in Spring WebFlux](https://www.baeldung.com/spring-webflux-concurrency) diff --git a/spring-webflux-threads/pom.xml b/spring-webflux-threads/pom.xml deleted file mode 100644 index fedfaa6967..0000000000 --- a/spring-webflux-threads/pom.xml +++ /dev/null @@ -1,65 +0,0 @@ - - - 4.0.0 - com.baeldung.spring - spring-webflux-threads - 1.0.0-SNAPSHOT - spring-webflux-threads - jar - Spring WebFlux Threads Sample - - - com.baeldung - parent-boot-2 - 0.0.1-SNAPSHOT - ../parent-boot-2 - - - - - org.springframework.boot - spring-boot-starter-webflux - - - - - - - - - io.reactivex.rxjava2 - rxjava - 2.2.19 - - - org.springframework.boot - spring-boot-starter-data-mongodb-reactive - - - io.projectreactor.kafka - reactor-kafka - 1.2.2.RELEASE - - - com.fasterxml.jackson.core - jackson-databind - - - org.springframework.boot - spring-boot-starter-test - test - - - io.projectreactor - reactor-test - test - - - - \ No newline at end of file diff --git a/spring-webflux-threads/src/main/java/com/baeldung/webflux/Application.java b/spring-webflux-threads/src/main/java/com/baeldung/webflux/Application.java deleted file mode 100644 index 06a148a77f..0000000000 --- a/spring-webflux-threads/src/main/java/com/baeldung/webflux/Application.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.baeldung.webflux; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - -/** -* Please note we assume Mongo and Kafka are running in the local machine and on default configuration. -* Additionally, if you want to experiment with Tomcat/Jetty instead of Netty, just uncomment the lines in pom.xml and rebuild. -*/ -@SpringBootApplication -public class Application { - - public static void main(String[] args) { - SpringApplication.run(Application.class, args); - } - -} diff --git a/spring-webflux-threads/src/main/java/com/baeldung/webflux/Controller.java b/spring-webflux-threads/src/main/java/com/baeldung/webflux/Controller.java deleted file mode 100644 index 3c7e4e41ca..0000000000 --- a/spring-webflux-threads/src/main/java/com/baeldung/webflux/Controller.java +++ /dev/null @@ -1,128 +0,0 @@ -package com.baeldung.webflux; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; -import org.springframework.web.reactive.function.client.WebClient; - -import io.reactivex.Observable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; -import reactor.kafka.receiver.KafkaReceiver; -import reactor.kafka.receiver.ReceiverOptions; -import reactor.kafka.receiver.ReceiverRecord; -import reactor.kafka.sender.KafkaSender; -import reactor.kafka.sender.SenderOptions; -import reactor.kafka.sender.SenderRecord; - -@RestController -@RequestMapping("/") -public class Controller { - - @Autowired - private PersonRepository personRepository; - - private Scheduler scheduler = Schedulers.newBoundedElastic(5, 10, "MyThreadGroup"); - - private Logger logger = LoggerFactory.getLogger(Controller.class); - - @GetMapping("/threads/webflux") - public Flux getThreadsWebflux() { - return Flux.fromIterable(getThreads()); - } - - @GetMapping("/threads/webclient") - public Flux getThreadsWebClient() { - WebClient.create("http://localhost:8080/index") - .get() - .retrieve() - .bodyToMono(String.class) - .subscribeOn(scheduler) - .publishOn(scheduler) - .doOnNext(s -> logger.info("Response: {}", s)) - .subscribe(); - return Flux.fromIterable(getThreads()); - } - - @GetMapping("/threads/rxjava") - public Observable getIndexRxJava() { - Observable.fromIterable(Arrays.asList("Hello", "World")) - .map(s -> s.toUpperCase()) - .observeOn(io.reactivex.schedulers.Schedulers.trampoline()) - .doOnNext(s -> logger.info("String: {}", s)) - .subscribe(); - return Observable.fromIterable(getThreads()); - } - - @GetMapping("/threads/mongodb") - public Flux getIndexMongo() { - personRepository.findAll() - .doOnNext(p -> logger.info("Person: {}", p)) - .subscribe(); - return Flux.fromIterable(getThreads()); - } - - @GetMapping("/threads/reactor-kafka") - public Flux getIndexKafka() { - Map producerProps = new HashMap<>(); - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - SenderOptions senderOptions = SenderOptions.create(producerProps); - KafkaSender sender = KafkaSender.create(senderOptions); - Flux> outboundFlux = Flux.range(1, 10) - .map(i -> SenderRecord.create(new ProducerRecord<>("reactive-test", i, "Message_" + i), i)); - sender.send(outboundFlux) - .subscribe(); - - Map consumerProps = new HashMap<>(); - consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "my-consumer"); - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); - consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - ReceiverOptions receiverOptions = ReceiverOptions.create(consumerProps); - receiverOptions.subscription(Collections.singleton("reactive-test")); - KafkaReceiver receiver = KafkaReceiver.create(receiverOptions); - Flux> inboundFlux = receiver.receive(); - inboundFlux.subscribe(r -> { - logger.info("Received message: {}", r.value()); - r.receiverOffset() - .acknowledge(); - }); - return Flux.fromIterable(getThreads()); - } - - @GetMapping("/index") - public Mono getIndex() { - return Mono.just("Hello world!"); - } - - private List getThreads() { - return Thread.getAllStackTraces() - .keySet() - .stream() - .map(t -> String.format("%-20s \t %s \t %d \t %s\n", t.getName(), t.getState(), t.getPriority(), t.isDaemon() ? "Daemon" : "Normal")) - .collect(Collectors.toList()); - } -} diff --git a/spring-webflux-threads/src/main/java/com/baeldung/webflux/Person.java b/spring-webflux-threads/src/main/java/com/baeldung/webflux/Person.java deleted file mode 100644 index 4c6bd5f585..0000000000 --- a/spring-webflux-threads/src/main/java/com/baeldung/webflux/Person.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.baeldung.webflux; - -import org.springframework.data.annotation.Id; -import org.springframework.data.mongodb.core.mapping.Document; - -@Document -public class Person { - @Id - String id; - - public Person(String id) { - this.id = id; - } - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - @Override - public String toString() { - return "Person{" + "id='" + id + '\'' + '}'; - } -} diff --git a/spring-webflux-threads/src/main/java/com/baeldung/webflux/PersonRepository.java b/spring-webflux-threads/src/main/java/com/baeldung/webflux/PersonRepository.java deleted file mode 100644 index 38fbd3d431..0000000000 --- a/spring-webflux-threads/src/main/java/com/baeldung/webflux/PersonRepository.java +++ /dev/null @@ -1,6 +0,0 @@ -package com.baeldung.webflux; - -import org.springframework.data.mongodb.repository.ReactiveMongoRepository; - -public interface PersonRepository extends ReactiveMongoRepository { -} diff --git a/spring-webflux-threads/src/main/resources/logback.xml b/spring-webflux-threads/src/main/resources/logback.xml deleted file mode 100644 index 7d900d8ea8..0000000000 --- a/spring-webflux-threads/src/main/resources/logback.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - - - - - - - - \ No newline at end of file