diff --git a/spring-5-webflux/pom.xml b/spring-5-webflux/pom.xml index ad1a66943c..b37e93ded8 100644 --- a/spring-5-webflux/pom.xml +++ b/spring-5-webflux/pom.xml @@ -65,8 +65,4 @@ - - 2.3.3.RELEASE - - \ No newline at end of file diff --git a/spring-5-webflux/src/main/java/com/baeldung/spring/rsocket/client/ClientConfiguration.java b/spring-5-webflux/src/main/java/com/baeldung/spring/rsocket/client/ClientConfiguration.java index abfe2e7807..2e2c309240 100644 --- a/spring-5-webflux/src/main/java/com/baeldung/spring/rsocket/client/ClientConfiguration.java +++ b/spring-5-webflux/src/main/java/com/baeldung/spring/rsocket/client/ClientConfiguration.java @@ -1,30 +1,29 @@ package com.baeldung.spring.rsocket.client; -import io.rsocket.RSocket; -import io.rsocket.RSocketFactory; -import io.rsocket.frame.decoder.PayloadDecoder; -import io.rsocket.transport.netty.client.TcpClientTransport; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.rsocket.RSocketRequester; -import org.springframework.messaging.rsocket.RSocketStrategies; import org.springframework.util.MimeTypeUtils; +import reactor.util.retry.Retry; + +import java.time.Duration; @Configuration public class ClientConfiguration { @Bean - public RSocket rSocket() { - return RSocketFactory.connect() - .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE) - .frameDecoder(PayloadDecoder.ZERO_COPY) - .transport(TcpClientTransport.create(7000)) - .start() - .block(); - } + public RSocketRequester getRSocketRequester(){ - @Bean - RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) { - return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies); + RSocketRequester.Builder builder = RSocketRequester.builder(); + + return builder + .rsocketConnector( + rSocketConnector -> + rSocketConnector.reconnect( + Retry.fixedDelay(2, Duration.ofSeconds(2)) + ) + ) + .dataMimeType(MimeTypeUtils.APPLICATION_JSON) + .tcp("localhost", 7000); } } diff --git a/spring-5-webflux/src/main/java/com/baeldung/spring/serverconfig/CustomNettyWebServerFactory.java b/spring-5-webflux/src/main/java/com/baeldung/spring/serverconfig/CustomNettyWebServerFactory.java index f9de3b4006..2d11a51160 100644 --- a/spring-5-webflux/src/main/java/com/baeldung/spring/serverconfig/CustomNettyWebServerFactory.java +++ b/spring-5-webflux/src/main/java/com/baeldung/spring/serverconfig/CustomNettyWebServerFactory.java @@ -25,12 +25,9 @@ public class CustomNettyWebServerFactory { @Override public HttpServer apply(HttpServer httpServer) { - EventLoopGroup parentGroup = new NioEventLoopGroup(); - EventLoopGroup childGroup = new NioEventLoopGroup(); - return httpServer - .tcpConfiguration(tcpServer -> tcpServer.bootstrap( - serverBootstrap -> serverBootstrap.group(parentGroup, childGroup).channel(NioServerSocketChannel.class) - )); + EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); + eventLoopGroup.register(new NioServerSocketChannel()); + return httpServer.runOn(eventLoopGroup); } } } diff --git a/spring-5-webflux/src/test/java/com/baeldung/spring/rsocket/server/MarketDataRSocketControllerLiveTest.java b/spring-5-webflux/src/test/java/com/baeldung/spring/rsocket/server/MarketDataRSocketControllerLiveTest.java index 40ddc732ac..7d8ed1f22d 100644 --- a/spring-5-webflux/src/test/java/com/baeldung/spring/rsocket/server/MarketDataRSocketControllerLiveTest.java +++ b/spring-5-webflux/src/test/java/com/baeldung/spring/rsocket/server/MarketDataRSocketControllerLiveTest.java @@ -1,15 +1,8 @@ package com.baeldung.spring.rsocket.server; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.verify; - import com.baeldung.spring.rsocket.model.MarketData; import com.baeldung.spring.rsocket.model.MarketDataRequest; import io.rsocket.RSocket; -import io.rsocket.RSocketFactory; -import io.rsocket.frame.decoder.PayloadDecoder; -import io.rsocket.transport.netty.client.TcpClientTransport; -import java.time.Duration; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; @@ -23,6 +16,12 @@ import org.springframework.messaging.rsocket.RSocketStrategies; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.util.MimeTypeUtils; +import reactor.util.retry.Retry; + +import java.time.Duration; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; @RunWith(SpringRunner.class) @SpringBootTest @@ -81,12 +80,24 @@ public class MarketDataRSocketControllerLiveTest { @Bean @Lazy public RSocket rSocket() { - return RSocketFactory.connect() - .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE) - .frameDecoder(PayloadDecoder.ZERO_COPY) - .transport(TcpClientTransport.create(7000)) - .start() - .block(); + + RSocketRequester.Builder builder = RSocketRequester.builder(); + + return builder + .rsocketConnector( + rSocketConnector -> + rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2)))) + .dataMimeType(MimeTypeUtils.APPLICATION_JSON) + .tcp("localhost", 7000) + .rsocket(); +// .connec/t(TcpClientTransport.create(6565)); + +// return RSocketFactory.connect() +// .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE) +// .frameDecoder(PayloadDecoder.ZERO_COPY) +// .transport(TcpClientTransport.create(7000)) +// .start() +// .block(); } @Bean