Merge remote-tracking branch 'upstream/master'
This commit is contained in:
commit
36bab29858
|
@ -43,7 +43,8 @@ public class ControlSubThread implements Runnable {
|
|||
try {
|
||||
Thread.sleep(interval);
|
||||
} catch (InterruptedException e) {
|
||||
// no-op, just loop again
|
||||
Thread.currentThread().interrupt();
|
||||
System.out.println("Thread was interrupted, Failed to complete operation");
|
||||
}
|
||||
// do something
|
||||
}
|
||||
|
|
|
@ -44,6 +44,11 @@
|
|||
<artifactId>kotlin-stdlib</artifactId>
|
||||
<version>${kotlin-stdlib.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jetbrains.kotlin</groupId>
|
||||
<artifactId>kotlin-stdlib-jre8</artifactId>
|
||||
<version>${kotlin-stdlib.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jetbrains.kotlin</groupId>
|
||||
<artifactId>kotlin-test-junit</artifactId>
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
package com.baeldung.kotlin
|
||||
|
||||
import org.junit.Test
|
||||
import java.beans.ExceptionListener
|
||||
import java.beans.XMLEncoder
|
||||
import java.io.*
|
||||
import java.lang.Exception
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertTrue
|
||||
import kotlin.test.fail
|
||||
|
||||
class UseTest {
|
||||
|
||||
@Test
|
||||
fun givenCloseable_whenUseIsCalled_thenItIsClosed() {
|
||||
val stringWriter = StringWriter()
|
||||
val writer = BufferedWriter(stringWriter) //Using a BufferedWriter because after close() it throws.
|
||||
writer.use {
|
||||
assertEquals(writer, it)
|
||||
|
||||
it.write("something")
|
||||
}
|
||||
try {
|
||||
writer.write("something else")
|
||||
|
||||
fail("write() should have thrown an exception because the writer is closed.")
|
||||
} catch (e: IOException) {
|
||||
//Ok
|
||||
}
|
||||
|
||||
assertEquals("something", stringWriter.toString())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun givenAutoCloseable_whenUseIsCalled_thenItIsClosed() {
|
||||
val baos = ByteArrayOutputStream()
|
||||
val encoder = XMLEncoder(PrintStream(baos)) //XMLEncoder is AutoCloseable but not Closeable.
|
||||
//Here, we use a PrintStream because after close() it throws.
|
||||
encoder.exceptionListener = ThrowingExceptionListener()
|
||||
encoder.use {
|
||||
assertEquals(encoder, it)
|
||||
|
||||
it.writeObject("something")
|
||||
}
|
||||
try {
|
||||
encoder.writeObject("something else")
|
||||
encoder.flush()
|
||||
|
||||
fail("write() should have thrown an exception because the encoder is closed.")
|
||||
} catch (e: IOException) {
|
||||
//Ok
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun whenSimpleFormIsUsed_thenItWorks() {
|
||||
StringWriter().use { it.write("something") }
|
||||
}
|
||||
}
|
||||
|
||||
class ThrowingExceptionListener : ExceptionListener {
|
||||
override fun exceptionThrown(e: Exception?) {
|
||||
if(e != null) {
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
|
@ -55,7 +55,7 @@ public class InfluxDBConnectionLiveTest {
|
|||
|
||||
InfluxDB connection = connectDatabase();
|
||||
|
||||
// Create "baeldung and check for it
|
||||
// Create "baeldung" and check for it
|
||||
connection.createDatabase("baeldung");
|
||||
assertTrue(connection.databaseExists("baeldung"));
|
||||
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
### Relevant Articles:
|
||||
- [Intro to Jedis – the Java Redis Client Library](http://www.baeldung.com/jedis-java-redis-client-library)
|
||||
- [A Guide to Redis with Redisson](http://www.baeldung.com/redis-redisson)
|
||||
- [Intro to Lettuce – the Java Redis Client Library](http://www.baeldung.com/lettuce-java-redis-client-library)
|
||||
|
||||
|
|
|
@ -36,6 +36,13 @@
|
|||
<artifactId>redisson</artifactId>
|
||||
<version>3.3.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.lettuce</groupId>
|
||||
<artifactId>lettuce-core</artifactId>
|
||||
<version>5.0.1.RELEASE</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
|
|
|
@ -0,0 +1,312 @@
|
|||
package com.baeldung;
|
||||
|
||||
import io.lettuce.core.LettuceFutures;
|
||||
import io.lettuce.core.RedisClient;
|
||||
import io.lettuce.core.RedisFuture;
|
||||
import io.lettuce.core.TransactionResult;
|
||||
import io.lettuce.core.api.StatefulRedisConnection;
|
||||
import io.lettuce.core.api.async.RedisAsyncCommands;
|
||||
import io.lettuce.core.api.sync.RedisCommands;
|
||||
import io.lettuce.core.pubsub.RedisPubSubListener;
|
||||
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
|
||||
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class LettuceIntegrationLiveTest {
|
||||
|
||||
private static Logger log = LoggerFactory.getLogger(LettuceIntegrationLiveTest.class);
|
||||
|
||||
private static StatefulRedisConnection<String, String> redisConnection;
|
||||
|
||||
private static RedisClient redisClient;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() {
|
||||
// Docker defaults to mapping redis port to 32768
|
||||
redisClient = RedisClient.create("redis://localhost:32768/");
|
||||
redisConnection = redisClient.connect();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void destroy() {
|
||||
redisConnection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAString_thenSaveItAsRedisStringsSync() {
|
||||
|
||||
RedisCommands<String, String> syncCommands = redisConnection.sync();
|
||||
|
||||
String key = "key";
|
||||
String value = "value";
|
||||
|
||||
syncCommands.set(key, value);
|
||||
String response = syncCommands.get(key);
|
||||
|
||||
Assert.assertEquals(value, response);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenValues_thenSaveAsRedisHashSync() {
|
||||
|
||||
RedisCommands<String, String> syncCommands = redisConnection.sync();
|
||||
|
||||
String recordName = "record1";
|
||||
String name = "FirstName";
|
||||
String value = "John";
|
||||
String surname = "LastName";
|
||||
String value1 = "Smith";
|
||||
|
||||
syncCommands.hset(recordName, name, value);
|
||||
syncCommands.hset(recordName, surname, value1);
|
||||
Map<String, String> record = syncCommands.hgetall(recordName);
|
||||
|
||||
Assert.assertEquals(record.get(name), value);
|
||||
Assert.assertEquals(record.get(surname), value1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAString_thenSaveItAsRedisStringsAsync() throws Exception {
|
||||
|
||||
RedisAsyncCommands<String, String> asyncCommands = redisConnection.async();
|
||||
|
||||
String key = "key";
|
||||
String value = "value";
|
||||
|
||||
asyncCommands.set(key, value);
|
||||
RedisFuture<String> redisFuture = asyncCommands.get(key);
|
||||
|
||||
String response = redisFuture.get();
|
||||
|
||||
Assert.assertEquals(value, response);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenValues_thenSaveAsRedisHashAsync() throws Exception {
|
||||
|
||||
RedisAsyncCommands<String, String> asyncCommands = redisConnection.async();
|
||||
|
||||
String recordName = "record1";
|
||||
String name = "FirstName";
|
||||
String value = "John";
|
||||
String surname = "LastName";
|
||||
String value1 = "Smith";
|
||||
|
||||
asyncCommands.hset(recordName, name, value);
|
||||
asyncCommands.hset(recordName, surname, value1);
|
||||
RedisFuture<Map<String, String>> redisFuture = asyncCommands.hgetall(recordName);
|
||||
|
||||
Map<String, String> record = redisFuture.get();
|
||||
|
||||
Assert.assertEquals(record.get(name), value);
|
||||
Assert.assertEquals(record.get(surname), value1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenValues_thenSaveAsRedisListAsync() throws Exception {
|
||||
|
||||
RedisAsyncCommands<String, String> asyncCommands = redisConnection.async();
|
||||
|
||||
String listName = "tasks";
|
||||
String firstTask = "firstTask";
|
||||
String secondTask = "secondTask";
|
||||
|
||||
asyncCommands.del(listName);
|
||||
|
||||
asyncCommands.lpush(listName, firstTask);
|
||||
asyncCommands.lpush(listName, secondTask);
|
||||
RedisFuture<String> redisFuture = asyncCommands.rpop(listName);
|
||||
|
||||
String nextTask = redisFuture.get();
|
||||
|
||||
Assert.assertEquals(firstTask, nextTask);
|
||||
|
||||
asyncCommands.del(listName);
|
||||
|
||||
asyncCommands.lpush(listName, firstTask);
|
||||
asyncCommands.lpush(listName, secondTask);
|
||||
|
||||
redisFuture = asyncCommands.lpop(listName);
|
||||
|
||||
nextTask = redisFuture.get();
|
||||
|
||||
Assert.assertEquals(secondTask, nextTask);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenSetElements_thenSaveThemInRedisSetAsync() throws Exception {
|
||||
|
||||
RedisAsyncCommands<String, String> asyncCommands = redisConnection.async();
|
||||
|
||||
String countries = "countries";
|
||||
|
||||
String countryOne = "Spain";
|
||||
String countryTwo = "Ireland";
|
||||
String countryThree = "Ireland";
|
||||
|
||||
asyncCommands.sadd(countries, countryOne);
|
||||
|
||||
RedisFuture<Set<String>> countriesSetFuture = asyncCommands.smembers(countries);
|
||||
Assert.assertEquals(2, countriesSetFuture.get().size());
|
||||
|
||||
asyncCommands.sadd(countries, countryTwo);
|
||||
countriesSetFuture = asyncCommands.smembers(countries);
|
||||
Assert.assertEquals(2, countriesSetFuture.get().size());
|
||||
|
||||
asyncCommands.sadd(countries, countryThree);
|
||||
countriesSetFuture = asyncCommands.smembers(countries);
|
||||
Assert.assertEquals(2, countriesSetFuture.get().size());
|
||||
|
||||
RedisFuture<Boolean> exists = asyncCommands.sismember(countries, countryThree);
|
||||
assertTrue(exists.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenARanking_thenSaveItInRedisSortedSetAsync() throws Exception {
|
||||
|
||||
RedisAsyncCommands<String, String> asyncCommands = redisConnection.async();
|
||||
|
||||
String key = "sortedset";
|
||||
|
||||
asyncCommands.zadd(key, 1, "one");
|
||||
asyncCommands.zadd(key, 4, "zero");
|
||||
asyncCommands.zadd(key, 2, "two");
|
||||
|
||||
RedisFuture<List<String>> values = asyncCommands.zrevrange(key, 0, 3);
|
||||
Assert.assertEquals("zero", values.get().get(0));
|
||||
|
||||
values = asyncCommands.zrange(key, 0, 3);
|
||||
Assert.assertEquals("one", values.get().get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenMultipleOperationsThatNeedToBeExecutedAtomically_thenWrapThemInATransaction() throws Exception {
|
||||
|
||||
RedisAsyncCommands<String, String> asyncCommands = redisConnection.async();
|
||||
|
||||
// Start a transaction
|
||||
asyncCommands.multi();
|
||||
|
||||
// Add three sets to it, and save the future responses
|
||||
RedisFuture<String> result1 = asyncCommands.set("key1", "value1");
|
||||
RedisFuture<String> result2 = asyncCommands.set("key2", "value2");
|
||||
RedisFuture<String> result3 = asyncCommands.set("key3", "value3");
|
||||
|
||||
// Execute it
|
||||
RedisFuture<TransactionResult> execResult = asyncCommands.exec();
|
||||
|
||||
TransactionResult transactionResult = execResult.get();
|
||||
|
||||
// Get the three results in the transaction return
|
||||
String firstResult = transactionResult.get(0);
|
||||
String secondResult = transactionResult.get(0);
|
||||
String thirdResult = transactionResult.get(0);
|
||||
|
||||
// Our results are in both!
|
||||
assertTrue(firstResult.equals("OK"));
|
||||
assertTrue(secondResult.equals("OK"));
|
||||
assertTrue(thirdResult.equals("OK"));
|
||||
|
||||
assertTrue(result1.get().equals("OK"));
|
||||
assertTrue(result2.get().equals("OK"));
|
||||
assertTrue(result3.get().equals("OK"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenMultipleIndependentOperations_whenNetworkOptimizationIsImportant_thenFlushManually() throws Exception {
|
||||
|
||||
int iterations = 50;
|
||||
|
||||
RedisAsyncCommands<String, String> asyncCommands = redisConnection.async();
|
||||
|
||||
asyncCommands.setAutoFlushCommands(false);
|
||||
|
||||
List<RedisFuture<?>> futures = new ArrayList<>();
|
||||
for (int i = 0; i < iterations; i++) {
|
||||
futures.add(asyncCommands.set("key" + i, "value" + i));
|
||||
}
|
||||
|
||||
asyncCommands.flushCommands();
|
||||
|
||||
// Wait until all futures complete
|
||||
boolean result = LettuceFutures.awaitAll(5, TimeUnit.SECONDS, futures.toArray(new RedisFuture[futures.size()]));
|
||||
|
||||
asyncCommands.setAutoFlushCommands(true);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenPubSubChannel_whenMessage_thenMessageReceived() throws Exception {
|
||||
|
||||
Listener listener = new Listener();
|
||||
StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub();
|
||||
StatefulRedisPubSubConnection<String, String> pubconnection = redisClient.connectPubSub();
|
||||
connection.addListener(listener);
|
||||
|
||||
RedisPubSubAsyncCommands<String, String> async = connection.async();
|
||||
async.subscribe("channel");
|
||||
|
||||
RedisPubSubAsyncCommands<String, String> pubasync = pubconnection.async();
|
||||
RedisFuture<Long> result = pubasync.publish("channel", "hithere");
|
||||
|
||||
// Need a long wait for publish to complete, depending on system.
|
||||
result.get(15, TimeUnit.SECONDS);
|
||||
assertTrue(listener.getMessage().equals("hithere"));
|
||||
|
||||
}
|
||||
|
||||
private static class Listener implements RedisPubSubListener<String, String> {
|
||||
|
||||
private String message;
|
||||
|
||||
String getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void message(String channel, String message) {
|
||||
log.debug("Got {} on {}", message, channel);
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void message(String pattern, String channel, String message) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribed(String channel, long count) {
|
||||
log.debug("Subscribed to {}", channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void psubscribed(String pattern, long count) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unsubscribed(String channel, long count) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void punsubscribed(String pattern, long count) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,3 @@
|
|||
## Relevant articles:
|
||||
|
||||
- [Spring Security 5 -OAuth2 Login](http://www.baeldung.com/spring-security-5-oauth2-login)
|
|
@ -1,10 +1,9 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
|
||||
xsi:schemaLocation="
|
||||
http://java.sun.com/xml/ns/javaee
|
||||
http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" id="WebApp_ID" version="3.0"
|
||||
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
|
||||
version="3.1"
|
||||
>
|
||||
|
||||
<display-name>Spring MVC XML Application</display-name>
|
||||
|
||||
<!-- Spring root -->
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>spring-reactive-websocket</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>spring-reactive-websocket</name>
|
||||
<description>Reactive WebSockets with Spring 5</description>
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-modules</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||
<java.version>1.8</java.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-integration</artifactId>
|
||||
<version>2.0.0.M7</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-webflux</artifactId>
|
||||
<version>2.0.0.M7</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<scope>compile</scope>
|
||||
<version>RELEASE</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>spring-snapshots</id>
|
||||
<name>Spring Snapshots</name>
|
||||
<url>https://repo.spring.io/snapshot</url>
|
||||
<snapshots>
|
||||
<enabled>true</enabled>
|
||||
</snapshots>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>spring-milestones</id>
|
||||
<name>Spring Milestones</name>
|
||||
<url>https://repo.spring.io/milestone</url>
|
||||
<snapshots>
|
||||
<enabled>false</enabled>
|
||||
</snapshots>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<pluginRepositories>
|
||||
<pluginRepository>
|
||||
<id>spring-snapshots</id>
|
||||
<name>Spring Snapshots</name>
|
||||
<url>https://repo.spring.io/snapshot</url>
|
||||
<snapshots>
|
||||
<enabled>true</enabled>
|
||||
</snapshots>
|
||||
</pluginRepository>
|
||||
<pluginRepository>
|
||||
<id>spring-milestones</id>
|
||||
<name>Spring Milestones</name>
|
||||
<url>https://repo.spring.io/milestone</url>
|
||||
<snapshots>
|
||||
<enabled>false</enabled>
|
||||
</snapshots>
|
||||
</pluginRepository>
|
||||
</pluginRepositories>
|
||||
|
||||
|
||||
</project>
|
|
@ -0,0 +1,11 @@
|
|||
package com.baeldung;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
public class Event {
|
||||
private String eventId;
|
||||
private String eventDt;
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package com.baeldung;
|
||||
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.web.reactive.socket.WebSocketMessage;
|
||||
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
|
||||
import org.springframework.web.reactive.socket.client.WebSocketClient;
|
||||
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@SpringBootApplication
|
||||
public class ReactiveWebSocketApplication {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ReactiveWebSocketApplication.class, args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Spring Reactive WebSocket Client
|
||||
* **/
|
||||
@Bean
|
||||
CommandLineRunner runner() {
|
||||
return run -> {
|
||||
WebSocketClient client = new ReactorNettyWebSocketClient();
|
||||
client.execute(URI.create("ws://localhost:8080/event-emitter"), session -> session.send(Mono.just(session.textMessage("event-me-from-spring-reactive-client")))
|
||||
.thenMany(session.receive()
|
||||
.map(WebSocketMessage::getPayloadAsText)
|
||||
.log())
|
||||
.then())
|
||||
.block();
|
||||
// .block(Duration.ofSeconds(10L));//force timeout after given duration
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package com.baeldung;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.web.reactive.HandlerMapping;
|
||||
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
|
||||
import org.springframework.web.reactive.socket.WebSocketHandler;
|
||||
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Configuration
|
||||
public class ReactiveWebSocketConfiguration {
|
||||
|
||||
@Autowired
|
||||
private WebSocketHandler webSocketHandler;
|
||||
|
||||
@Bean
|
||||
public HandlerMapping webSocketHandlerMapping() {
|
||||
Map<String, WebSocketHandler> map = new HashMap<>();
|
||||
map.put("/event-emitter", webSocketHandler);
|
||||
|
||||
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
|
||||
handlerMapping.setOrder(1);
|
||||
handlerMapping.setUrlMap(map);
|
||||
return handlerMapping;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public WebSocketHandlerAdapter handlerAdapter() {
|
||||
return new WebSocketHandlerAdapter();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
package com.baeldung;
|
||||
|
||||
import org.springframework.web.reactive.socket.WebSocketSession;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.reactive.socket.WebSocketHandler;
|
||||
import org.springframework.web.reactive.socket.WebSocketMessage;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.UUID;
|
||||
|
||||
@Component
|
||||
public class ReactiveWebSocketHandler implements WebSocketHandler {
|
||||
|
||||
private Flux<Event> eventFlux;
|
||||
private Flux<Event> intervalFlux;
|
||||
|
||||
/**
|
||||
* Here we prepare a Flux that will emit a message every second
|
||||
*/
|
||||
@PostConstruct
|
||||
private void init() throws InterruptedException {
|
||||
|
||||
eventFlux = Flux.generate(e -> {
|
||||
Event event = new Event(UUID.randomUUID()
|
||||
.toString(),
|
||||
LocalDateTime.now()
|
||||
.toString());
|
||||
e.next(event);
|
||||
});
|
||||
|
||||
intervalFlux = Flux.interval(Duration.ofMillis(1000L))
|
||||
.zipWith(eventFlux, (time, event) -> event);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* On each new client session, send the message flux to the client.
|
||||
* Spring subscribes to the flux and send every new flux event to the WebSocketSession object
|
||||
* @param session
|
||||
* @return Mono<Void>
|
||||
*/
|
||||
@Override
|
||||
public Mono<Void> handle(WebSocketSession webSocketSession) {
|
||||
ObjectMapper json = new ObjectMapper();
|
||||
return webSocketSession.send(intervalFlux.map(event -> {
|
||||
try {
|
||||
String jsonEvent = json.writeValueAsString(event);
|
||||
System.out.println(jsonEvent);
|
||||
return jsonEvent;
|
||||
} catch (JsonProcessingException e) {
|
||||
e.printStackTrace();
|
||||
return "";
|
||||
}
|
||||
})
|
||||
.map(webSocketSession::textMessage))
|
||||
|
||||
.and(webSocketSession.receive()
|
||||
.map(WebSocketMessage::getPayloadAsText)
|
||||
.log());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<title>Baeldung: Spring 5 Reactive Client WebSocket (Browser)</title>
|
||||
</head>
|
||||
<body>
|
||||
|
||||
<div class="events"></div>
|
||||
<script>
|
||||
var clientWebSocket = new WebSocket("ws://localhost:8080/event-emitter");
|
||||
clientWebSocket.onopen = function() {
|
||||
console.log("clientWebSocket.onopen", clientWebSocket);
|
||||
console.log("clientWebSocket.readyState", "websocketstatus");
|
||||
clientWebSocket.send("event-me-from-browser");
|
||||
}
|
||||
clientWebSocket.onclose = function(error) {
|
||||
console.log("clientWebSocket.onclose", clientWebSocket, error);
|
||||
events("Closing connection");
|
||||
}
|
||||
clientWebSocket.onerror = function(error) {
|
||||
console.log("clientWebSocket.onerror", clientWebSocket, error);
|
||||
events("An error occured");
|
||||
}
|
||||
clientWebSocket.onmessage = function(error) {
|
||||
console.log("clientWebSocket.onmessage", clientWebSocket, error);
|
||||
events(error.data);
|
||||
}
|
||||
function events(responseEvent) {
|
||||
document.querySelector(".events").innerHTML += responseEvent + "<br>";
|
||||
}
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
Loading…
Reference in New Issue