BAEL-6082 RSocket Interface in Spring 6 (#14873)

* Rsocket in spring framework6.

* reformat code.

* Reformate code with removing blank rows.

* some changes in maven dependency and codes

* moving spring boot dependency from parent to new projects.changing spring core version

---------

Co-authored-by: rezaganjis <Ganji@tosan.com>
This commit is contained in:
Reza Ganji 2023-09-30 03:53:21 +03:30 committed by GitHub
parent 9e5c656d84
commit cafabdfe55
9 changed files with 259 additions and 2 deletions

View File

@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>parent-spring-6</artifactId>
<version>0.0.1-SNAPSHOT</version>

View File

@ -892,6 +892,7 @@
<module>spring-5</module>
<module>spring-5-webflux</module>
<module>spring-5-webflux-2</module>
<module>spring-6-rsocket</module>
<module>spring-activiti</module>
<module>spring-actuator</module>
<module>spring-core-2</module>

View File

@ -0,0 +1,7 @@
## RSocket
This module contains articles about RSocket in Spring Framework 6.
### Relevant articles
- [Introduction to RSocket](#)

74
spring-6-rsocket/pom.xml Normal file
View File

@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bealdung</groupId>
<artifactId>rsocket</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rsocket</name>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-spring-6</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../parent-spring-6</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId>
<version>${junit-jupiter.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<properties>
<spring-boot.version>3.1.3</spring-boot.version>
<logback.version>1.4.11</logback.version>
<slf4j.version>2.0.9</slf4j.version>
</properties>
</project>

View File

@ -0,0 +1,21 @@
package com.bealdung.rsocket.requester;
import org.springframework.messaging.rsocket.service.RSocketExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface MessageClient {
@RSocketExchange("MyDestination")
Mono<String> sendMessage(Mono<String> input);
@RSocketExchange("Counter")
Flux<String> Counter();
@RSocketExchange("Warning")
Mono<Void> Warning(Mono<String> warning);
@RSocketExchange("channel")
Flux<String> channel(Flux<String> input);
}

View File

@ -0,0 +1,45 @@
package com.bealdung.rsocket.responder;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Controller
public class MessageController {
@MessageMapping("MyDestination")
public Mono<String> message(Mono<String> input) {
return input.doOnNext(msg -> System.out.println("Request is:" + msg + ",Request!"))
.map(msg -> msg + ",Response!");
}
@MessageMapping("Counter")
public Flux<String> Counter() {
return Flux.range(1, 10)
.map(i -> "Count is: " + i);
}
@MessageMapping("Warning")
public Mono<Void> Warning(Mono<String> error) {
error.doOnNext(e -> System.out.println("warning is :" + e))
.subscribe();
return Mono.empty();
}
@MessageMapping("channel")
public Flux<String> channel(Flux<String> input) {
return input.doOnNext(i -> {
System.out.println("Received message is : " + i);
})
.map(m -> m.toUpperCase())
.doOnNext(r -> {
System.out.println("RESPONSE IS :" + r);
});
}
}

View File

@ -0,0 +1,73 @@
package com.bealdung.rsocket.responder;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.service.RSocketServiceProxyFactory;
import com.bealdung.rsocket.requester.MessageClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@SpringBootApplication
public class RSocketApplication {
public static void main(String[] args) {
SpringApplication.run(RSocketApplication.class, args);
}
@Bean
public RSocketServiceProxyFactory getRSocketServiceProxyFactory(RSocketRequester.Builder requestBuilder) {
RSocketRequester requester = requestBuilder.tcp("localhost", 7000);
return RSocketServiceProxyFactory.builder(requester)
.build();
}
@Bean
public MessageClient getClient(RSocketServiceProxyFactory factory) {
return factory.createClient(MessageClient.class);
}
@Bean
public ApplicationRunner runRequestResponseModel(MessageClient client) {
return args -> {
client.sendMessage(Mono.just("Request-Response test "))
.doOnNext(message -> {
System.out.println("Response is :" + message);
})
.subscribe();
};
}
@Bean
public ApplicationRunner runStreamModel(MessageClient client) {
return args -> {
client.Counter()
.doOnNext(t -> {
System.out.println("message is :" + t);
})
.subscribe();
};
}
@Bean
public ApplicationRunner runFireAndForget(MessageClient client) {
return args -> {
client.Warning(Mono.just("Important Warning"))
.subscribe();
};
}
@Bean
public ApplicationRunner runChannel(MessageClient client) {
return args -> {
client.channel(Flux.just("a", "b", "c", "d", "e"))
.doOnNext(i -> {
System.out.println(i);
})
.subscribe();
};
}
}

View File

@ -0,0 +1 @@
spring.rsocket.server.port=7000

View File

@ -0,0 +1,35 @@
package com.bealdung.rsocket;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.service.RSocketServiceProxyFactory;
import com.bealdung.rsocket.requester.MessageClient;
import com.bealdung.rsocket.responder.RSocketApplication;
import reactor.core.publisher.Mono;
@SpringBootTest(classes = RSocketApplication.class)
public class RSocketRequestResponseIntegrationTest {
MessageClient client;
public RSocketRequestResponseIntegrationTest() {
RSocketRequester.Builder requesterBuilder = RSocketRequester.builder();
RSocketRequester requester = requesterBuilder.tcp("localhost", 7000);
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester)
.build();
client = factory.createClient(MessageClient.class);
}
@Test
public void whenSendingStream_thenReceiveTheSameStream() {
String message = "test message";
assertEquals(message, client.sendMessage(Mono.just(message))
.block());
}
}