Merge pull request #11185 from Maiklins/JAVA-6006-Upgrade_spring-5-webflux_module_-_part_2

JAVA-6006 Upgrade Spring 5 Webflux module
This commit is contained in:
kwoyke 2021-09-15 09:28:46 +02:00 committed by GitHub
commit 8de3eb13c3
4 changed files with 34 additions and 39 deletions

View File

@ -65,8 +65,4 @@
</plugins> </plugins>
</build> </build>
<properties>
<spring-boot.version>2.3.3.RELEASE</spring-boot.version>
</properties>
</project> </project>

View File

@ -1,30 +1,29 @@
package com.baeldung.spring.rsocket.client; package com.baeldung.spring.rsocket.client;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.client.TcpClientTransport;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils; import org.springframework.util.MimeTypeUtils;
import reactor.util.retry.Retry;
import java.time.Duration;
@Configuration @Configuration
public class ClientConfiguration { public class ClientConfiguration {
@Bean @Bean
public RSocket rSocket() { public RSocketRequester getRSocketRequester(){
return RSocketFactory.connect()
.mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
@Bean RSocketRequester.Builder builder = RSocketRequester.builder();
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies); return builder
.rsocketConnector(
rSocketConnector ->
rSocketConnector.reconnect(
Retry.fixedDelay(2, Duration.ofSeconds(2))
)
)
.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
.tcp("localhost", 7000);
} }
} }

View File

@ -25,12 +25,9 @@ public class CustomNettyWebServerFactory {
@Override @Override
public HttpServer apply(HttpServer httpServer) { public HttpServer apply(HttpServer httpServer) {
EventLoopGroup parentGroup = new NioEventLoopGroup(); EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup(); eventLoopGroup.register(new NioServerSocketChannel());
return httpServer return httpServer.runOn(eventLoopGroup);
.tcpConfiguration(tcpServer -> tcpServer.bootstrap(
serverBootstrap -> serverBootstrap.group(parentGroup, childGroup).channel(NioServerSocketChannel.class)
));
} }
} }
} }

View File

@ -1,15 +1,8 @@
package com.baeldung.spring.rsocket.server; package com.baeldung.spring.rsocket.server;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import com.baeldung.spring.rsocket.model.MarketData; import com.baeldung.spring.rsocket.model.MarketData;
import com.baeldung.spring.rsocket.model.MarketDataRequest; import com.baeldung.spring.rsocket.model.MarketDataRequest;
import io.rsocket.RSocket; import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.client.TcpClientTransport;
import java.time.Duration;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -23,6 +16,12 @@ import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.MimeTypeUtils; import org.springframework.util.MimeTypeUtils;
import reactor.util.retry.Retry;
import java.time.Duration;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
@RunWith(SpringRunner.class) @RunWith(SpringRunner.class)
@SpringBootTest @SpringBootTest
@ -81,12 +80,16 @@ public class MarketDataRSocketControllerLiveTest {
@Bean @Bean
@Lazy @Lazy
public RSocket rSocket() { public RSocket rSocket() {
return RSocketFactory.connect()
.mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE) RSocketRequester.Builder builder = RSocketRequester.builder();
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(7000)) return builder
.start() .rsocketConnector(
.block(); rSocketConnector ->
rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2))))
.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
.tcp("localhost", 7000)
.rsocket();
} }
@Bean @Bean