BAEL-2907: Added code samples for rsocket

This commit is contained in:
isaolmez 2019-05-18 20:09:07 +03:00
parent 85dfbca818
commit 98ab12669a
14 changed files with 478 additions and 13 deletions

View File

@ -28,12 +28,22 @@
</dependencies>
</dependencyManagement>
<properties>
<java.version>1.8</java.version>
<spring-boot.version>2.2.0.M3</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
@ -46,8 +56,8 @@
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
@ -60,4 +70,35 @@
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</pluginRepository>
</pluginRepositories>
</project>

View File

@ -0,0 +1,16 @@
package com.baeldung.spring.rsocket.client;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
@SpringBootApplication
public class ClientApplication {
public static void main(String[] args) {
new SpringApplicationBuilder()
.main(ClientApplication.class)
.sources(ClientApplication.class)
.profiles("client")
.run(args);
}
}

View File

@ -0,0 +1,30 @@
package com.baeldung.spring.rsocket.client;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.client.TcpClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;
@Configuration
public class ClientConfiguration {
@Bean
public RSocket rSocket() {
return RSocketFactory.connect()
.mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}
}

View File

@ -0,0 +1,47 @@
package com.baeldung.spring.rsocket.client;
import com.baeldung.spring.rsocket.model.MarketData;
import com.baeldung.spring.rsocket.model.MarketDataRequest;
import java.util.Random;
import org.reactivestreams.Publisher;
import org.springframework.http.MediaType;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MarketDataRestController {
private final Random random = new Random();
private final RSocketRequester rSocketRequester;
public MarketDataRestController(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
@GetMapping(value = "/current/{stock}")
public Publisher<MarketData> current(@PathVariable("stock") String stock) {
return rSocketRequester.route("currentMarketData")
.data(new MarketDataRequest(stock))
.retrieveMono(MarketData.class);
}
@GetMapping(value = "/feed/{stock}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<MarketData> feed(@PathVariable("stock") String stock) {
return rSocketRequester.route("feedMarketData")
.data(new MarketDataRequest(stock))
.retrieveFlux(MarketData.class);
}
@GetMapping(value = "/collect")
public Publisher<Void> collect() {
return rSocketRequester.route("collectMarketData")
.data(getMarketData())
.send();
}
private MarketData getMarketData() {
return new MarketData("X", random.nextInt(10));
}
}

View File

@ -0,0 +1,20 @@
package com.baeldung.spring.rsocket.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MarketData {
private String stock;
private int currentPrice;
public static MarketData fromException(Exception e) {
MarketData marketData = new MarketData();
marketData.setStock(e.getMessage());
return marketData;
}
}

View File

@ -0,0 +1,13 @@
package com.baeldung.spring.rsocket.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MarketDataRequest {
private String stock;
}

View File

@ -0,0 +1,40 @@
package com.baeldung.spring.rsocket.server;
import com.baeldung.spring.rsocket.model.MarketData;
import com.baeldung.spring.rsocket.model.MarketDataRequest;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Controller
public class MarketDataRSocketController {
private final MarketDataRepository marketDataRepository;
public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
this.marketDataRepository = marketDataRepository;
}
@MessageMapping("currentMarketData")
public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) {
return marketDataRepository.getOne(marketDataRequest.getStock());
}
@MessageMapping("feedMarketData")
public Flux<MarketData> feedMarketData(MarketDataRequest marketDataRequest) {
return marketDataRepository.getAll(marketDataRequest.getStock());
}
@MessageMapping("collectMarketData")
public Mono<Void> collectMarketData(MarketData marketData) {
marketDataRepository.add(marketData);
return Mono.empty();
}
@MessageExceptionHandler
public Mono<MarketData> handleException(Exception e) {
return Mono.just(MarketData.fromException(e));
}
}

View File

@ -0,0 +1,37 @@
package com.baeldung.spring.rsocket.server;
import com.baeldung.spring.rsocket.model.MarketData;
import java.time.Duration;
import java.util.Random;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
@Component
public class MarketDataRepository {
private static final int BOUND = 100;
private Random random = new Random();
public Flux<MarketData> getAll(String stock) {
return Flux.fromStream(Stream.generate(() -> getMarketDataResponse(stock)))
.log()
.delayElements(Duration.ofSeconds(1));
}
public Mono<MarketData> getOne(String stock) {
return Mono.just(getMarketDataResponse(stock));
}
public void add(MarketData marketData) {
log.info("New market data: {}", marketData);
}
private MarketData getMarketDataResponse(String stock) {
return new MarketData(stock, random.nextInt(BOUND));
}
}

View File

@ -0,0 +1,16 @@
package com.baeldung.spring.rsocket.server;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
@SpringBootApplication
public class ServerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder()
.main(ServerApplication.class)
.sources(ServerApplication.class)
.profiles("server")
.run(args);
}
}

View File

@ -0,0 +1 @@
server.port=8080

View File

@ -0,0 +1,2 @@
spring.rsocket.server.port=7000
server.port=8081

View File

@ -0,0 +1,92 @@
package com.baeldung.spring.rsocket.client;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import static org.springframework.http.MediaType.TEXT_EVENT_STREAM;
import com.baeldung.spring.rsocket.model.MarketData;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketRequester.RequestSpec;
import org.springframework.messaging.rsocket.RSocketRequester.ResponseSpec;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.FluxExchangeResult;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@RunWith(SpringRunner.class)
@WebFluxTest(value = MarketDataRestController.class)
public class MarketDataRestControllerIntegrationTest {
@Autowired
private WebTestClient testClient;
@MockBean
private RSocketRequester rSocketRequester;
@Mock
private RequestSpec requestSpec;
@Mock
private ResponseSpec responseSpec;
@Test
public void whenInitiatesRequest_ThenGetsResponse() throws Exception {
when(rSocketRequester.route("currentMarketData")).thenReturn(requestSpec);
when(requestSpec.data(any())).thenReturn(responseSpec);
MarketData marketData = new MarketData("X", 1);
when(responseSpec.retrieveMono(MarketData.class)).thenReturn(Mono.just(marketData));
testClient.get()
.uri("/current/{stock}", "X")
.exchange()
.expectStatus()
.isOk()
.expectBody(MarketData.class)
.isEqualTo(marketData);
}
@Test
public void whenInitiatesFireAndForget_ThenGetsNoResponse() throws Exception {
when(rSocketRequester.route("collectMarketData")).thenReturn(requestSpec);
when(requestSpec.data(any())).thenReturn(responseSpec);
when(responseSpec.send()).thenReturn(Mono.empty());
testClient.get()
.uri("/collect")
.exchange()
.expectStatus()
.isOk()
.expectBody(Void.class);
}
@Test
public void whenInitiatesRequest_ThenGetsStream() throws Exception {
when(rSocketRequester.route("feedMarketData")).thenReturn(requestSpec);
when(requestSpec.data(any())).thenReturn(responseSpec);
MarketData firstMarketData = new MarketData("X", 1);
MarketData secondMarketData = new MarketData("X", 2);
when(responseSpec.retrieveFlux(MarketData.class)).thenReturn(Flux.just(firstMarketData, secondMarketData));
FluxExchangeResult<MarketData> result = testClient.get()
.uri("/feed/{stock}", "X")
.accept(TEXT_EVENT_STREAM)
.exchange()
.expectStatus()
.isOk()
.returnResult(MarketData.class);
Flux<MarketData> marketDataFlux = result.getResponseBody();
StepVerifier.create(marketDataFlux)
.expectNext(firstMarketData)
.expectNext(secondMarketData)
.thenCancel()
.verify();
}
}

View File

@ -0,0 +1,98 @@
package com.baeldung.spring.rsocket.server;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import com.baeldung.spring.rsocket.model.MarketData;
import com.baeldung.spring.rsocket.model.MarketDataRequest;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.client.TcpClientTransport;
import java.time.Duration;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.MimeTypeUtils;
@RunWith(SpringRunner.class)
@SpringBootTest
@TestPropertySource(properties = {"spring.rsocket.server.port=7000"})
public class MarketDataRSocketControllerLiveTest {
@Autowired
private RSocketRequester rSocketRequester;
@SpyBean
private MarketDataRSocketController rSocketController;
@Test
public void whenGetsFireAndForget_ThenReturnsNoResponse() throws InterruptedException {
final MarketData marketData = new MarketData("X", 1);
rSocketRequester.route("collectMarketData")
.data(marketData)
.send()
.block(Duration.ofSeconds(10));
sleepForProcessing();
verify(rSocketController).collectMarketData(any());
}
@Test
public void whenGetsRequest_ThenReturnsResponse() throws InterruptedException {
final MarketDataRequest marketDataRequest = new MarketDataRequest("X");
rSocketRequester.route("currentMarketData")
.data(marketDataRequest)
.send()
.block(Duration.ofSeconds(10));
sleepForProcessing();
verify(rSocketController).currentMarketData(any());
}
@Test
public void whenGetsRequest_ThenReturnsStream() throws InterruptedException {
final MarketDataRequest marketDataRequest = new MarketDataRequest("X");
rSocketRequester.route("feedMarketData")
.data(marketDataRequest)
.send()
.block(Duration.ofSeconds(10));
sleepForProcessing();
verify(rSocketController).feedMarketData(any());
}
private void sleepForProcessing() throws InterruptedException {
Thread.sleep(1000);
}
@TestConfiguration
public static class ClientConfiguration {
@Bean
@Lazy
public RSocket rSocket() {
return RSocketFactory.connect()
.mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
@Bean
@Lazy
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}
}
}

View File

@ -1,5 +1,10 @@
package com.baeldung.spring.webclientrequests;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.time.Duration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -17,8 +22,6 @@ import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Mono;
import static org.mockito.Mockito.*;
@RunWith(SpringRunner.class)
@WebFluxTest
public class WebClientRequestsUnitTest {
@ -50,7 +53,8 @@ public class WebClientRequestsUnitTest {
public void whenCallSimpleURI_thenURIMatched() {
this.webClient.get()
.uri("/products")
.retrieve();
.exchange()
.block(Duration.ofSeconds(1));
verifyCalledUrl("/products");
}
@ -60,7 +64,8 @@ public class WebClientRequestsUnitTest {
.uri(uriBuilder -> uriBuilder
.path("/products/{id}")
.build(2))
.retrieve();
.exchange()
.block(Duration.ofSeconds(1));
verifyCalledUrl("/products/2");
}
@ -70,7 +75,8 @@ public class WebClientRequestsUnitTest {
.uri(uriBuilder -> uriBuilder
.path("/products/{id}/attributes/{attributeId}")
.build(2, 13))
.retrieve();
.exchange()
.block(Duration.ofSeconds(1));
verifyCalledUrl("/products/2/attributes/13");
}
@ -83,7 +89,8 @@ public class WebClientRequestsUnitTest {
.queryParam("color", "black")
.queryParam("deliveryDate", "13/04/2019")
.build())
.retrieve();
.exchange()
.block(Duration.ofSeconds(1));
verifyCalledUrl("/products/?name=AndroidPhone&color=black&deliveryDate=13/04/2019");
}
@ -96,7 +103,8 @@ public class WebClientRequestsUnitTest {
.queryParam("color", "{authorId}")
.queryParam("deliveryDate", "{date}")
.build("AndroidPhone", "black", "13/04/2019"))
.retrieve();
.exchange()
.block(Duration.ofSeconds(1));
verifyCalledUrl("/products/?name=AndroidPhone&color=black&deliveryDate=13%2F04%2F2019");
}
@ -107,7 +115,8 @@ public class WebClientRequestsUnitTest {
.path("/products/")
.queryParam("tag[]", "Snapdragon", "NFC")
.build())
.retrieve();
.exchange()
.block(Duration.ofSeconds(1));
verifyCalledUrl("/products/?tag%5B%5D=Snapdragon&tag%5B%5D=NFC");
}
@ -119,7 +128,8 @@ public class WebClientRequestsUnitTest {
.path("/products/")
.queryParam("category", "Phones", "Tablets")
.build())
.retrieve();
.exchange()
.block(Duration.ofSeconds(1));
verifyCalledUrl("/products/?category=Phones&category=Tablets");
}
@ -130,7 +140,8 @@ public class WebClientRequestsUnitTest {
.path("/products/")
.queryParam("category", String.join(",", "Phones", "Tablets"))
.build())
.retrieve();
.exchange()
.block(Duration.ofSeconds(1));
verifyCalledUrl("/products/?category=Phones,Tablets");
}
@ -151,7 +162,8 @@ public class WebClientRequestsUnitTest {
.queryParam("color", "black")
.queryParam("deliveryDate", "13/04/2019")
.build())
.retrieve();
.exchange()
.block(Duration.ofSeconds(1));
verifyCalledUrl("/products/?name=AndroidPhone&color=black&deliveryDate=13/04/2019");
}