diff --git a/persistence-modules/pom.xml b/persistence-modules/pom.xml
index e5214752cf..77f3fdb625 100644
--- a/persistence-modules/pom.xml
+++ b/persistence-modules/pom.xml
@@ -77,6 +77,7 @@
spring-data-jdbc
spring-data-keyvalue
spring-data-mongodb
+ spring-data-mongodb-reactive
spring-data-neo4j
spring-data-redis
spring-data-solr
diff --git a/persistence-modules/spring-data-mongodb-reactive/README.md b/persistence-modules/spring-data-mongodb-reactive/README.md
new file mode 100644
index 0000000000..0931161700
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/README.md
@@ -0,0 +1,12 @@
+## Spring Data Reactive Project
+
+This module contains articles about reactive Spring 5 Data
+
+### The Course
+The "REST With Spring" Classes: http://bit.ly/restwithspring
+
+### Relevant Articles
+- [Spring Data Reactive Repositories with MongoDB](https://www.baeldung.com/spring-data-mongodb-reactive)
+- [Spring Data MongoDB Tailable Cursors](https://www.baeldung.com/spring-data-mongodb-tailable-cursors)
+- [A Quick Look at R2DBC with Spring Data](https://www.baeldung.com/spring-data-r2dbc)
+- [Spring Data Reactive Repositories with Couchbase](https://www.baeldung.com/spring-data-reactive-couchbase)
diff --git a/persistence-modules/spring-data-mongodb-reactive/pom.xml b/persistence-modules/spring-data-mongodb-reactive/pom.xml
new file mode 100644
index 0000000000..9fb22b6033
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/pom.xml
@@ -0,0 +1,157 @@
+
+
+ 4.0.0
+ spring-data-mongodb-reactive
+ spring-data-mongodb-reactive
+ jar
+
+
+ com.baeldung
+ parent-boot-2
+ 0.0.1-SNAPSHOT
+ ../../parent-boot-2
+
+
+
+
+ io.projectreactor
+ reactor-core
+ ${reactor-core.version}
+
+
+ org.springframework.boot
+ spring-boot-starter-data-couchbase-reactive
+
+
+ org.springframework.boot
+ spring-boot-starter-data-mongodb-reactive
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.projectlombok
+ lombok
+
+
+ io.projectreactor
+ reactor-test
+ test
+
+
+ io.reactivex.rxjava2
+ rxjava
+
+
+ org.springframework
+ spring-test
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ de.flapdoodle.embed
+ de.flapdoodle.embed.mongo
+ test
+
+
+ org.springframework.boot
+ spring-boot-starter-webflux
+
+
+ org.springframework
+ spring-tx
+ ${spring-tx.version}
+
+
+ org.springframework.data
+ spring-data-r2dbc
+ ${spring-data-r2dbc.version}
+
+
+ io.r2dbc
+ r2dbc-h2
+ ${r2dbc-h2.version}
+
+
+ com.h2database
+ h2
+ ${h2.version}
+
+
+ org.apache.httpcomponents
+ httpclient
+ ${httpclient.version}
+
+
+ com.couchbase.mock
+ CouchbaseMock
+ ${couchbaseMock.version}
+ test
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ ${java.version}
+
+
+
+
+ default-compile
+ none
+
+
+
+ default-testCompile
+ none
+
+
+ java-compile
+ compile
+
+ compile
+
+
+
+ java-test-compile
+ test-compile
+
+ testCompile
+
+
+
+
+
+
+
+
+ 5.2.2.RELEASE
+ 1.0.0.RELEASE
+ 0.8.1.RELEASE
+ 4.5.2
+ 1.4.200
+ 1.5.23
+ 3.3.1.RELEASE
+
+ 2.2.6.RELEASE
+
+
+
\ No newline at end of file
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/ReactiveCouchbaseApplication.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/ReactiveCouchbaseApplication.java
new file mode 100644
index 0000000000..4e5bf9d5dc
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/ReactiveCouchbaseApplication.java
@@ -0,0 +1,13 @@
+package com.baeldung.couchbase;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
+
+@SpringBootApplication(exclude = MongoAutoConfiguration.class)
+public class ReactiveCouchbaseApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(ReactiveCouchbaseApplication.class, args);
+ }
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/configuration/CouchbaseProperties.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/configuration/CouchbaseProperties.java
new file mode 100644
index 0000000000..81f19eebd6
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/configuration/CouchbaseProperties.java
@@ -0,0 +1,43 @@
+package com.baeldung.couchbase.configuration;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.PropertySource;
+
+import java.util.Collections;
+import java.util.List;
+
+@Configuration
+@PropertySource("classpath:couchbase.properties")
+public class CouchbaseProperties {
+
+ private final List bootstrapHosts;
+ private final String bucketName;
+ private final String bucketPassword;
+ private final int port;
+
+ public CouchbaseProperties(@Value("${spring.couchbase.bootstrap-hosts}") final List bootstrapHosts, @Value("${spring.couchbase.bucket.name}") final String bucketName, @Value("${spring.couchbase.bucket.password}") final String bucketPassword,
+ @Value("${spring.couchbase.port}") final int port) {
+ this.bootstrapHosts = Collections.unmodifiableList(bootstrapHosts);
+ this.bucketName = bucketName;
+ this.bucketPassword = bucketPassword;
+ this.port = port;
+ }
+
+ public List getBootstrapHosts() {
+ return bootstrapHosts;
+ }
+
+ public String getBucketName() {
+ return bucketName;
+ }
+
+ public String getBucketPassword() {
+ return bucketPassword;
+ }
+
+ public int getPort() {
+ return port;
+ }
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/configuration/N1QLReactiveCouchbaseConfiguration.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/configuration/N1QLReactiveCouchbaseConfiguration.java
new file mode 100644
index 0000000000..059bd36cae
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/configuration/N1QLReactiveCouchbaseConfiguration.java
@@ -0,0 +1,15 @@
+package com.baeldung.couchbase.configuration;
+
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.data.couchbase.repository.config.EnableReactiveCouchbaseRepositories;
+
+@Configuration
+@EnableReactiveCouchbaseRepositories("com.baeldung.couchbase.domain.repository.n1ql")
+@Primary
+public class N1QLReactiveCouchbaseConfiguration extends ReactiveCouchbaseConfiguration {
+
+ public N1QLReactiveCouchbaseConfiguration(CouchbaseProperties couchbaseProperties) {
+ super(couchbaseProperties);
+ }
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/configuration/ReactiveCouchbaseConfiguration.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/configuration/ReactiveCouchbaseConfiguration.java
new file mode 100644
index 0000000000..a51b19ee22
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/configuration/ReactiveCouchbaseConfiguration.java
@@ -0,0 +1,48 @@
+package com.baeldung.couchbase.configuration;
+
+import com.couchbase.client.java.env.CouchbaseEnvironment;
+import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.data.couchbase.config.AbstractReactiveCouchbaseConfiguration;
+import org.springframework.data.couchbase.config.BeanNames;
+import org.springframework.data.couchbase.repository.support.IndexManager;
+
+import java.util.List;
+
+public abstract class ReactiveCouchbaseConfiguration extends AbstractReactiveCouchbaseConfiguration {
+
+ private CouchbaseProperties couchbaseProperties;
+
+ public ReactiveCouchbaseConfiguration(final CouchbaseProperties couchbaseProperties) {
+ this.couchbaseProperties = couchbaseProperties;
+ }
+
+ @Override
+ protected List getBootstrapHosts() {
+ return couchbaseProperties.getBootstrapHosts();
+ }
+
+ @Override
+ protected String getBucketName() {
+ return couchbaseProperties.getBucketName();
+ }
+
+ @Override
+ protected String getBucketPassword() {
+ return couchbaseProperties.getBucketPassword();
+ }
+
+ @Override
+ public CouchbaseEnvironment couchbaseEnvironment() {
+ return DefaultCouchbaseEnvironment
+ .builder()
+ .bootstrapHttpDirectPort(couchbaseProperties.getPort())
+ .build();
+ }
+
+ @Bean(name = BeanNames.COUCHBASE_INDEX_MANAGER)
+ public IndexManager couchbaseIndexManager() {
+ return new IndexManager(true, true, false);
+ }
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/configuration/ViewReactiveCouchbaseConfiguration.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/configuration/ViewReactiveCouchbaseConfiguration.java
new file mode 100644
index 0000000000..9b4d9b0319
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/configuration/ViewReactiveCouchbaseConfiguration.java
@@ -0,0 +1,13 @@
+package com.baeldung.couchbase.configuration;
+
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.couchbase.repository.config.EnableReactiveCouchbaseRepositories;
+
+@Configuration
+@EnableReactiveCouchbaseRepositories("com.baeldung.couchbase.domain.repository.view")
+public class ViewReactiveCouchbaseConfiguration extends ReactiveCouchbaseConfiguration {
+
+ public ViewReactiveCouchbaseConfiguration(CouchbaseProperties couchbaseProperties) {
+ super(couchbaseProperties);
+ }
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/domain/Person.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/domain/Person.java
new file mode 100644
index 0000000000..285de34df8
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/domain/Person.java
@@ -0,0 +1,43 @@
+package com.baeldung.couchbase.domain;
+
+import org.springframework.data.annotation.Id;
+import org.springframework.data.couchbase.core.mapping.Document;
+
+import java.util.Objects;
+import java.util.UUID;
+
+@Document
+public class Person {
+
+ @Id private UUID id;
+ private String firstName;
+
+ public Person(final UUID id, final String firstName) {
+ this.id = id;
+ this.firstName = firstName;
+ }
+
+ private Person() {
+ }
+
+ public UUID getId() {
+ return id;
+ }
+
+ public String getFirstName() {
+ return firstName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Person person = (Person) o;
+ return Objects.equals(id, person.id) && Objects.equals(firstName, person.firstName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, firstName);
+ }
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/domain/repository/n1ql/N1QLPersonRepository.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/domain/repository/n1ql/N1QLPersonRepository.java
new file mode 100644
index 0000000000..6f73a77ceb
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/domain/repository/n1ql/N1QLPersonRepository.java
@@ -0,0 +1,16 @@
+package com.baeldung.couchbase.domain.repository.n1ql;
+
+import com.baeldung.couchbase.domain.Person;
+import org.springframework.data.couchbase.core.query.N1qlPrimaryIndexed;
+import org.springframework.data.repository.reactive.ReactiveCrudRepository;
+import org.springframework.stereotype.Repository;
+import reactor.core.publisher.Flux;
+
+import java.util.UUID;
+
+@Repository
+@N1qlPrimaryIndexed
+public interface N1QLPersonRepository extends ReactiveCrudRepository {
+
+ Flux findAllByFirstName(final String firstName);
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/domain/repository/n1ql/N1QLSortingPersonRepository.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/domain/repository/n1ql/N1QLSortingPersonRepository.java
new file mode 100644
index 0000000000..57dd149425
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/domain/repository/n1ql/N1QLSortingPersonRepository.java
@@ -0,0 +1,13 @@
+package com.baeldung.couchbase.domain.repository.n1ql;
+
+import com.baeldung.couchbase.domain.Person;
+import org.springframework.data.couchbase.core.query.N1qlPrimaryIndexed;
+import org.springframework.data.repository.reactive.ReactiveSortingRepository;
+import org.springframework.stereotype.Repository;
+
+import java.util.UUID;
+
+@Repository
+@N1qlPrimaryIndexed
+public interface N1QLSortingPersonRepository extends ReactiveSortingRepository {
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/domain/repository/view/ViewPersonRepository.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/domain/repository/view/ViewPersonRepository.java
new file mode 100644
index 0000000000..06c47c2393
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/couchbase/domain/repository/view/ViewPersonRepository.java
@@ -0,0 +1,20 @@
+package com.baeldung.couchbase.domain.repository.view;
+
+import com.baeldung.couchbase.domain.Person;
+import org.springframework.data.couchbase.core.query.View;
+import org.springframework.data.couchbase.core.query.ViewIndexed;
+import org.springframework.data.repository.reactive.ReactiveCrudRepository;
+import org.springframework.stereotype.Repository;
+import reactor.core.publisher.Flux;
+
+import java.util.UUID;
+
+@Repository
+@ViewIndexed(designDoc = ViewPersonRepository.DESIGN_DOCUMENT)
+public interface ViewPersonRepository extends ReactiveCrudRepository {
+
+ String DESIGN_DOCUMENT = "person";
+
+ @View(designDocument = ViewPersonRepository.DESIGN_DOCUMENT)
+ Flux findByFirstName(String firstName);
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/r2dbc/R2dbcApplication.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/r2dbc/R2dbcApplication.java
new file mode 100644
index 0000000000..557b6ff42a
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/r2dbc/R2dbcApplication.java
@@ -0,0 +1,15 @@
+package com.baeldung.r2dbc;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+
+@SpringBootApplication
+@ComponentScan(basePackages = "com.baeldung.r2dbc")
+public class R2dbcApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(R2dbcApplication.class, args);
+ }
+
+}
\ No newline at end of file
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/r2dbc/configuration/R2DBCConfiguration.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/r2dbc/configuration/R2DBCConfiguration.java
new file mode 100644
index 0000000000..54f06d9c6c
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/r2dbc/configuration/R2DBCConfiguration.java
@@ -0,0 +1,21 @@
+package com.baeldung.r2dbc.configuration;
+
+import io.r2dbc.h2.H2ConnectionConfiguration;
+import io.r2dbc.h2.H2ConnectionFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
+import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
+
+@Configuration
+@EnableR2dbcRepositories(basePackages = "com.baeldung.r2dbc.repository")
+public class R2DBCConfiguration extends AbstractR2dbcConfiguration {
+ @Bean
+ public H2ConnectionFactory connectionFactory() {
+ return new H2ConnectionFactory(
+ H2ConnectionConfiguration.builder()
+ .url("mem:testdb;DB_CLOSE_DELAY=-1;TRACE_LEVEL_FILE=4")
+ .username("sa")
+ .build());
+ }
+}
\ No newline at end of file
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/r2dbc/model/Player.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/r2dbc/model/Player.java
new file mode 100644
index 0000000000..1e28cb3d07
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/r2dbc/model/Player.java
@@ -0,0 +1,18 @@
+package com.baeldung.r2dbc.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.relational.core.mapping.Table;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Table
+public class Player {
+ @Id
+ Integer id;
+ String name;
+ Integer age;
+}
\ No newline at end of file
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/r2dbc/repository/PlayerRepository.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/r2dbc/repository/PlayerRepository.java
new file mode 100644
index 0000000000..20f7642a7c
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/r2dbc/repository/PlayerRepository.java
@@ -0,0 +1,17 @@
+package com.baeldung.r2dbc.repository;
+
+import org.springframework.data.r2dbc.repository.Query;
+import org.springframework.data.repository.reactive.ReactiveCrudRepository;
+
+import com.baeldung.r2dbc.model.Player;
+
+import reactor.core.publisher.Flux;
+
+public interface PlayerRepository extends ReactiveCrudRepository {
+
+ @Query("select id, name, age from player where name = $1")
+ Flux findAllByName(String name);
+
+ @Query("select * from player where age = $1")
+ Flux findByAge(int age);
+}
\ No newline at end of file
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/reactive/Spring5ReactiveApplication.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/reactive/Spring5ReactiveApplication.java
new file mode 100644
index 0000000000..e96767145e
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/reactive/Spring5ReactiveApplication.java
@@ -0,0 +1,25 @@
+package com.baeldung.reactive;
+
+import com.mongodb.reactivestreams.client.MongoClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
+
+@SpringBootApplication
+public class Spring5ReactiveApplication{
+
+ public static void main(String[] args) {
+ SpringApplication.run(Spring5ReactiveApplication.class, args);
+ }
+
+ @Autowired
+ MongoClient mongoClient;
+
+ @Bean
+ public ReactiveMongoTemplate reactiveMongoTemplate() {
+ return new ReactiveMongoTemplate(mongoClient, "test");
+ }
+
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/reactive/model/Account.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/reactive/model/Account.java
new file mode 100644
index 0000000000..57abd80009
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/reactive/model/Account.java
@@ -0,0 +1,21 @@
+package com.baeldung.reactive.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+@Document
+@Data
+@ToString
+@AllArgsConstructor
+@NoArgsConstructor
+public class Account {
+
+ @Id
+ private String id;
+ private String owner;
+ private Double value;
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/reactive/repository/AccountCrudRepository.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/reactive/repository/AccountCrudRepository.java
new file mode 100644
index 0000000000..8798c13772
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/reactive/repository/AccountCrudRepository.java
@@ -0,0 +1,15 @@
+package com.baeldung.reactive.repository;
+
+import com.baeldung.reactive.model.Account;
+import org.springframework.data.repository.reactive.ReactiveCrudRepository;
+import org.springframework.stereotype.Repository;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@Repository
+public interface AccountCrudRepository extends ReactiveCrudRepository {
+
+ public Flux findAllByValue(Double value);
+
+ public Mono findFirstByOwner(Mono owner);
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/reactive/repository/AccountMongoRepository.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/reactive/repository/AccountMongoRepository.java
new file mode 100644
index 0000000000..5c09e4a264
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/reactive/repository/AccountMongoRepository.java
@@ -0,0 +1,7 @@
+package com.baeldung.reactive.repository;
+
+import com.baeldung.reactive.model.Account;
+import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
+
+public interface AccountMongoRepository extends ReactiveMongoRepository {
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/reactive/repository/AccountRxJavaRepository.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/reactive/repository/AccountRxJavaRepository.java
new file mode 100644
index 0000000000..6afe92a21b
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/reactive/repository/AccountRxJavaRepository.java
@@ -0,0 +1,15 @@
+package com.baeldung.reactive.repository;
+
+import com.baeldung.reactive.model.Account;
+import io.reactivex.Observable;
+import io.reactivex.Single;
+import org.springframework.data.repository.reactive.RxJava2CrudRepository;
+import org.springframework.stereotype.Repository;
+
+@Repository
+public interface AccountRxJavaRepository extends RxJava2CrudRepository{
+
+ public Observable findAllByValue(Double value);
+
+ public Single findFirstByOwner(Single owner);
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/reactive/template/AccountTemplateOperations.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/reactive/template/AccountTemplateOperations.java
new file mode 100644
index 0000000000..9d32f34e3b
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/reactive/template/AccountTemplateOperations.java
@@ -0,0 +1,33 @@
+package com.baeldung.reactive.template;
+
+import com.baeldung.reactive.model.Account;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
+import org.springframework.data.mongodb.core.ReactiveRemoveOperation;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@Service
+public class AccountTemplateOperations {
+
+ @Autowired
+ ReactiveMongoTemplate template;
+
+ public Mono findById(String id) {
+ return template.findById(id, Account.class);
+ }
+
+ public Flux findAll() {
+ return template.findAll(Account.class);
+ }
+
+ public Mono save(Mono account) {
+ return template.save(account);
+ }
+
+ public ReactiveRemoveOperation.ReactiveRemove deleteAll() {
+ return template.remove(Account.class);
+ }
+
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/LogsCounterApplication.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/LogsCounterApplication.java
new file mode 100644
index 0000000000..8b2511a8f3
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/LogsCounterApplication.java
@@ -0,0 +1,11 @@
+package com.baeldung.tailablecursor;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class LogsCounterApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(LogsCounterApplication.class, args);
+ }
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/domain/Log.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/domain/Log.java
new file mode 100644
index 0000000000..717a367751
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/domain/Log.java
@@ -0,0 +1,21 @@
+package com.baeldung.tailablecursor.domain;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+@Data
+@Document
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class Log {
+ @Id
+ private String id;
+ private String service;
+ private LogLevel level;
+ private String message;
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/domain/LogLevel.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/domain/LogLevel.java
new file mode 100644
index 0000000000..6826fbffd3
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/domain/LogLevel.java
@@ -0,0 +1,5 @@
+package com.baeldung.tailablecursor.domain;
+
+public enum LogLevel {
+ ERROR, WARN, INFO
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/repository/LogsRepository.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/repository/LogsRepository.java
new file mode 100644
index 0000000000..dce11c548c
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/repository/LogsRepository.java
@@ -0,0 +1,12 @@
+package com.baeldung.tailablecursor.repository;
+
+import com.baeldung.tailablecursor.domain.Log;
+import com.baeldung.tailablecursor.domain.LogLevel;
+import org.springframework.data.mongodb.repository.Tailable;
+import org.springframework.data.repository.reactive.ReactiveCrudRepository;
+import reactor.core.publisher.Flux;
+
+public interface LogsRepository extends ReactiveCrudRepository {
+ @Tailable
+ Flux findByLevel(LogLevel level);
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/service/ErrorLogsCounter.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/service/ErrorLogsCounter.java
new file mode 100644
index 0000000000..c243e64f97
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/service/ErrorLogsCounter.java
@@ -0,0 +1,62 @@
+package com.baeldung.tailablecursor.service;
+
+import com.baeldung.tailablecursor.domain.Log;
+import com.baeldung.tailablecursor.domain.LogLevel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.data.mongodb.core.mapping.Document;
+import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
+import org.springframework.data.mongodb.core.messaging.MessageListener;
+import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
+import org.springframework.data.mongodb.core.messaging.TailableCursorRequest;
+
+import javax.annotation.PreDestroy;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.springframework.data.mongodb.core.query.Criteria.where;
+import static org.springframework.data.mongodb.core.query.Query.query;
+
+@Slf4j
+public class ErrorLogsCounter implements LogsCounter {
+
+ private static final String LEVEL_FIELD_NAME = "level";
+
+ private final String collectionName;
+ private final MessageListenerContainer container;
+
+ private final AtomicInteger counter = new AtomicInteger();
+
+ public ErrorLogsCounter(MongoTemplate mongoTemplate,
+ String collectionName) {
+ this.collectionName = collectionName;
+ this.container = new DefaultMessageListenerContainer(mongoTemplate);
+
+ container.start();
+ TailableCursorRequest request = getTailableCursorRequest();
+ container.register(request, Log.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ private TailableCursorRequest getTailableCursorRequest() {
+ MessageListener listener = message -> {
+ log.info("ERROR log received: {}", message.getBody());
+ counter.incrementAndGet();
+ };
+
+ return TailableCursorRequest.builder()
+ .collection(collectionName)
+ .filter(query(where(LEVEL_FIELD_NAME).is(LogLevel.ERROR)))
+ .publishTo(listener)
+ .build();
+ }
+
+ @Override
+ public int count() {
+ return counter.get();
+ }
+
+ @PreDestroy
+ public void close() {
+ container.stop();
+ }
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/service/InfoLogsCounter.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/service/InfoLogsCounter.java
new file mode 100644
index 0000000000..29301bffec
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/service/InfoLogsCounter.java
@@ -0,0 +1,36 @@
+package com.baeldung.tailablecursor.service;
+
+import com.baeldung.tailablecursor.domain.Log;
+import com.baeldung.tailablecursor.domain.LogLevel;
+import com.baeldung.tailablecursor.repository.LogsRepository;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+
+import javax.annotation.PreDestroy;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+public class InfoLogsCounter implements LogsCounter {
+
+ private final AtomicInteger counter = new AtomicInteger();
+ private final Disposable subscription;
+
+ public InfoLogsCounter(LogsRepository repository) {
+ Flux stream = repository.findByLevel(LogLevel.INFO);
+ this.subscription = stream.subscribe(logEntity -> {
+ log.info("INFO log received: " + logEntity);
+ counter.incrementAndGet();
+ });
+ }
+
+ @Override
+ public int count() {
+ return this.counter.get();
+ }
+
+ @PreDestroy
+ public void close() {
+ this.subscription.dispose();
+ }
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/service/LogsCounter.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/service/LogsCounter.java
new file mode 100644
index 0000000000..e14a3eadd7
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/service/LogsCounter.java
@@ -0,0 +1,5 @@
+package com.baeldung.tailablecursor.service;
+
+public interface LogsCounter {
+ int count();
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/service/WarnLogsCounter.java b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/service/WarnLogsCounter.java
new file mode 100644
index 0000000000..2dff8e8e40
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/java/com/baeldung/tailablecursor/service/WarnLogsCounter.java
@@ -0,0 +1,41 @@
+package com.baeldung.tailablecursor.service;
+
+import com.baeldung.tailablecursor.domain.Log;
+import com.baeldung.tailablecursor.domain.LogLevel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.mongodb.core.ReactiveMongoOperations;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+
+import javax.annotation.PreDestroy;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.springframework.data.mongodb.core.query.Criteria.where;
+import static org.springframework.data.mongodb.core.query.Query.query;
+
+@Slf4j
+public class WarnLogsCounter implements LogsCounter {
+
+ private static final String LEVEL_FIELD_NAME = "level";
+
+ private final AtomicInteger counter = new AtomicInteger();
+ private final Disposable subscription;
+
+ public WarnLogsCounter(ReactiveMongoOperations template) {
+ Flux stream = template.tail(query(where(LEVEL_FIELD_NAME).is(LogLevel.WARN)), Log.class);
+ subscription = stream.subscribe(logEntity -> {
+ log.warn("WARN log received: " + logEntity);
+ counter.incrementAndGet();
+ });
+ }
+
+ @Override
+ public int count() {
+ return counter.get();
+ }
+
+ @PreDestroy
+ public void close() {
+ subscription.dispose();
+ }
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/resources/couchbase.properties b/persistence-modules/spring-data-mongodb-reactive/src/main/resources/couchbase.properties
new file mode 100644
index 0000000000..53fad807fe
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/resources/couchbase.properties
@@ -0,0 +1,4 @@
+spring.couchbase.bucket.name=default
+spring.couchbase.bootstrap-hosts=localhost
+spring.couchbase.port=8091
+spring.couchbase.bucket.password=123456
\ No newline at end of file
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/main/resources/logback.xml b/persistence-modules/spring-data-mongodb-reactive/src/main/resources/logback.xml
new file mode 100644
index 0000000000..7d900d8ea8
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/main/resources/logback.xml
@@ -0,0 +1,13 @@
+
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/SpringContextTest.java b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/SpringContextTest.java
new file mode 100644
index 0000000000..bedb30fcaa
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/SpringContextTest.java
@@ -0,0 +1,17 @@
+package com.baeldung;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import com.baeldung.reactive.Spring5ReactiveApplication;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = Spring5ReactiveApplication.class)
+public class SpringContextTest {
+
+ @Test
+ public void whenSpringContextIsBootstrapped_thenNoExceptions() {
+ }
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/couchbase/domain/repository/CouchbaseMockConfiguration.java b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/couchbase/domain/repository/CouchbaseMockConfiguration.java
new file mode 100644
index 0000000000..2a09fce4b0
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/couchbase/domain/repository/CouchbaseMockConfiguration.java
@@ -0,0 +1,54 @@
+package com.baeldung.couchbase.domain.repository;
+
+import com.baeldung.couchbase.configuration.CouchbaseProperties;
+import com.couchbase.mock.Bucket;
+import com.couchbase.mock.BucketConfiguration;
+import com.couchbase.mock.CouchbaseMock;
+import org.springframework.boot.test.context.TestConfiguration;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+
+@TestConfiguration
+public class CouchbaseMockConfiguration {
+
+ private CouchbaseMock couchbaseMock;
+
+ public CouchbaseMockConfiguration(final CouchbaseProperties couchbaseProperties) {
+ final BucketConfiguration bucketConfiguration = new BucketConfiguration();
+ bucketConfiguration.numNodes = 1;
+ bucketConfiguration.numReplicas = 1;
+ bucketConfiguration.numVBuckets = 1024;
+ bucketConfiguration.name = couchbaseProperties.getBucketName();
+ bucketConfiguration.type = Bucket.BucketType.COUCHBASE;
+ bucketConfiguration.password = couchbaseProperties.getBucketPassword();
+
+ try {
+ couchbaseMock = new CouchbaseMock(couchbaseProperties.getPort(), Collections.singletonList(bucketConfiguration));
+ } catch (final IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ }
+
+ @PostConstruct
+ public void postConstruct() {
+ try {
+ couchbaseMock.start();
+ } catch (final IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ try {
+ couchbaseMock.waitForStartup();
+ } catch (final InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @PreDestroy
+ public void preDestroy() {
+ couchbaseMock.stop();
+ }
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/couchbase/domain/repository/n1ql/N1QLPersonRepositoryLiveTest.java b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/couchbase/domain/repository/n1ql/N1QLPersonRepositoryLiveTest.java
new file mode 100644
index 0000000000..c8dbbf429e
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/couchbase/domain/repository/n1ql/N1QLPersonRepositoryLiveTest.java
@@ -0,0 +1,55 @@
+package com.baeldung.couchbase.domain.repository.n1ql;
+
+import com.baeldung.couchbase.domain.Person;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.util.UUID;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(properties = {"spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.mongo.embedded.EmbeddedMongoAutoConfiguration"})
+public class N1QLPersonRepositoryLiveTest {
+
+ @Autowired private N1QLPersonRepository personRepository;
+
+ @Test
+ public void shouldFindAll_byLastName() {
+ //Given
+ final String firstName = "John";
+ final Person matchingPerson = new Person(UUID.randomUUID(), firstName);
+ final Person nonMatchingPerson = new Person(UUID.randomUUID(), "NotJohn");
+ wrap(() -> {
+ personRepository
+ .save(matchingPerson)
+ .subscribe();
+ personRepository
+ .save(nonMatchingPerson)
+ .subscribe();
+ //When
+ final Flux allByFirstName = personRepository.findAllByFirstName(firstName);
+ //Then
+ StepVerifier
+ .create(allByFirstName)
+ .expectNext(matchingPerson)
+ .verifyComplete();
+
+ }, matchingPerson, nonMatchingPerson);
+ }
+
+ private void wrap(final Runnable runnable, final Person... people) {
+ try {
+ runnable.run();
+ } finally {
+ for (final Person person : people) {
+ personRepository
+ .delete(person)
+ .subscribe();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/couchbase/domain/repository/n1ql/N1QLSortingPersonRepositoryLiveTest.java b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/couchbase/domain/repository/n1ql/N1QLSortingPersonRepositoryLiveTest.java
new file mode 100644
index 0000000000..8c6ce137f1
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/couchbase/domain/repository/n1ql/N1QLSortingPersonRepositoryLiveTest.java
@@ -0,0 +1,60 @@
+package com.baeldung.couchbase.domain.repository.n1ql;
+
+import com.baeldung.couchbase.domain.Person;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.data.domain.Sort;
+import org.springframework.test.context.junit4.SpringRunner;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.util.UUID;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(properties = {"spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.mongo.embedded.EmbeddedMongoAutoConfiguration"})
+public class N1QLSortingPersonRepositoryLiveTest {
+
+ @Autowired private N1QLSortingPersonRepository personRepository;
+
+ @Test
+ public void shouldFindAll_sortedByFirstName() {
+ //Given
+ final Person firstPerson = new Person(UUID.randomUUID(), "John");
+ final Person secondPerson = new Person(UUID.randomUUID(), "Mikki");
+ wrap(() -> {
+ personRepository
+ .save(firstPerson)
+ .subscribe();
+ personRepository
+ .save(secondPerson)
+ .subscribe();
+ //When
+ final Flux allByFirstName = personRepository.findAll(Sort.by(Sort.Direction.DESC, "firstName"));
+ //Then
+ StepVerifier
+ .create(allByFirstName)
+ .expectNextMatches(person -> person
+ .getFirstName()
+ .equals(secondPerson.getFirstName()))
+ .expectNextMatches(person -> person
+ .getFirstName()
+ .equals(firstPerson.getFirstName()))
+ .verifyComplete();
+ }, firstPerson, secondPerson);
+ }
+
+ //workaround for deleteAll()
+ private void wrap(final Runnable runnable, final Person... people) {
+ try {
+ runnable.run();
+ } finally {
+ for (final Person person : people) {
+ personRepository
+ .delete(person)
+ .subscribe();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/couchbase/domain/repository/view/ViewPersonRepositoryIntegrationTest.java b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/couchbase/domain/repository/view/ViewPersonRepositoryIntegrationTest.java
new file mode 100644
index 0000000000..15688e1b80
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/couchbase/domain/repository/view/ViewPersonRepositoryIntegrationTest.java
@@ -0,0 +1,81 @@
+package com.baeldung.couchbase.domain.repository.view;
+
+import com.baeldung.couchbase.configuration.CouchbaseProperties;
+import com.baeldung.couchbase.configuration.ViewReactiveCouchbaseConfiguration;
+import com.baeldung.couchbase.domain.Person;
+import com.baeldung.couchbase.domain.repository.CouchbaseMockConfiguration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringRunner;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.util.UUID;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(properties = { "spring.couchbase.port=10010", "spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.mongo.embedded.EmbeddedMongoAutoConfiguration,org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration" },
+ classes = { CouchbaseMockConfiguration.class, ViewReactiveCouchbaseConfiguration.class, CouchbaseProperties.class })
+public class ViewPersonRepositoryIntegrationTest {
+
+ @Autowired private ViewPersonRepository personRepository;
+
+ @Test
+ public void shouldSavePerson_findById_thenDeleteIt() {
+ //Given
+ final UUID id = UUID.randomUUID();
+ final Person person = new Person(id, "John");
+ wrap(() -> {
+ personRepository
+ .save(person)
+ .subscribe();
+ //When
+ final Mono byId = personRepository.findById(id);
+ //Then
+ StepVerifier
+ .create(byId)
+ .expectNextMatches(result -> result
+ .getId()
+ .equals(id))
+ .expectComplete()
+ .verify();
+ }, person);
+ }
+
+ @Test
+ public void shouldFindAll_thenDeleteIt() {
+ //Given
+ final Person person = new Person(UUID.randomUUID(), "John");
+ final Person secondPerson = new Person(UUID.randomUUID(), "Mikki");
+ wrap(() -> {
+ personRepository
+ .save(person)
+ .subscribe();
+ personRepository
+ .save(secondPerson)
+ .subscribe();
+ //When
+ final Flux all = personRepository.findAll();
+ //Then
+ StepVerifier
+ .create(all)
+ .expectNextCount(2)
+ .verifyComplete();
+ }, person, secondPerson);
+ }
+
+ private void wrap(final Runnable runnable, final Person... people) {
+ try {
+ runnable.run();
+ } finally {
+ for (final Person person : people) {
+ personRepository
+ .delete(person)
+ .subscribe();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/r2dbc/R2dbcApplicationIntegrationTest.java b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/r2dbc/R2dbcApplicationIntegrationTest.java
new file mode 100644
index 0000000000..1af570587e
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/r2dbc/R2dbcApplicationIntegrationTest.java
@@ -0,0 +1,124 @@
+package com.baeldung.r2dbc;
+
+
+import com.baeldung.r2dbc.model.Player;
+import com.baeldung.r2dbc.repository.PlayerRepository;
+import io.r2dbc.h2.H2ConnectionFactory;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.data.r2dbc.core.DatabaseClient;
+import org.springframework.test.context.junit4.SpringRunner;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Hooks;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class R2dbcApplicationIntegrationTest {
+
+
+ @Autowired
+ PlayerRepository playerRepository;
+
+ @Autowired
+ DatabaseClient client;
+
+ @Autowired
+ H2ConnectionFactory factory;
+
+
+ @Before
+ public void setup() {
+
+ Hooks.onOperatorDebug();
+
+ List statements = Arrays.asList(//
+ "DROP TABLE IF EXISTS player;",
+ "CREATE table player (id INT AUTO_INCREMENT NOT NULL, name VARCHAR2, age INT NOT NULL);");
+
+ statements.forEach(it -> client.execute(it) //
+ .fetch() //
+ .rowsUpdated() //
+ .as(StepVerifier::create) //
+ .expectNextCount(1) //
+ .verifyComplete());
+
+ }
+
+ @Test
+ public void whenDeleteAll_then0IsExpected() {
+
+
+ playerRepository.deleteAll()
+ .as(StepVerifier::create)
+ .expectNextCount(0)
+ .verifyComplete();
+ }
+
+ @Test
+ public void whenInsert6_then6AreExpected() {
+
+ insertPlayers();
+
+ playerRepository.findAll()
+ .as(StepVerifier::create)
+ .expectNextCount(6)
+ .verifyComplete();
+ }
+
+ @Test
+ public void whenSearchForCR7_then1IsExpected() {
+
+ insertPlayers();
+
+ playerRepository.findAllByName("CR7")
+ .as(StepVerifier::create)
+ .expectNextCount(1)
+ .verifyComplete();
+ }
+
+ @Test
+ public void whenSearchFor32YearsOld_then2AreExpected() {
+ insertPlayers();
+
+ playerRepository.findByAge(32)
+ .as(StepVerifier::create)
+ .expectNextCount(2)
+ .verifyComplete();
+ }
+
+ @Test
+ public void whenBatchHas2Operations_then2AreExpected() {
+ Mono.from(factory.create())
+ .flatMapMany(connection -> Flux.from(connection
+ .createBatch()
+ .add("select * from player")
+ .add("select * from player")
+ .execute()))
+ .as(StepVerifier::create)
+ .expectNextCount(2)
+ .verifyComplete();
+
+ }
+
+ private void insertPlayers() {
+ List players = Arrays.asList(
+ new Player(null, "Kaka", 37),
+ new Player(null, "Messi", 32),
+ new Player(null, "Mbappé", 20),
+ new Player(null, "CR7", 34),
+ new Player(null, "Lewandowski", 30),
+ new Player(null, "Cavani", 32)
+ );
+
+ playerRepository.saveAll(players).subscribe();
+ }
+}
+
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/reactive/repository/AccountCrudRepositoryManualTest.java b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/reactive/repository/AccountCrudRepositoryManualTest.java
new file mode 100644
index 0000000000..d4b1d0eeda
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/reactive/repository/AccountCrudRepositoryManualTest.java
@@ -0,0 +1,70 @@
+package com.baeldung.reactive.repository;
+
+
+import com.baeldung.reactive.Spring5ReactiveApplication;
+import com.baeldung.reactive.model.Account;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = Spring5ReactiveApplication.class)
+public class AccountCrudRepositoryManualTest {
+
+ @Autowired
+ AccountCrudRepository repository;
+
+ @Test
+ public void givenValue_whenFindAllByValue_thenFindAccount() {
+ repository.save(new Account(null, "Bill", 12.3)).block();
+ Flux accountFlux = repository.findAllByValue(12.3);
+
+ StepVerifier.create(accountFlux)
+ .assertNext(account -> {
+ assertEquals("Bill", account.getOwner());
+ assertEquals(Double.valueOf(12.3) , account.getValue());
+ assertNotNull(account.getId());
+ })
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ public void givenOwner_whenFindFirstByOwner_thenFindAccount() {
+ repository.save(new Account(null, "Bill", 12.3)).block();
+ Mono accountMono = repository.findFirstByOwner(Mono.just("Bill"));
+
+ StepVerifier.create(accountMono)
+ .assertNext(account -> {
+ assertEquals("Bill", account.getOwner());
+ assertEquals(Double.valueOf(12.3) , account.getValue());
+ assertNotNull(account.getId());
+ })
+ .expectComplete()
+ .verify();
+
+
+
+ }
+
+ @Test
+ public void givenAccount_whenSave_thenSaveAccount() {
+ Mono accountMono = repository.save(new Account(null, "Bill", 12.3));
+
+ StepVerifier
+ .create(accountMono)
+ .assertNext(account -> assertNotNull(account.getId()))
+ .expectComplete()
+ .verify();
+ }
+
+
+}
\ No newline at end of file
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/reactive/repository/AccountMongoRepositoryManualTest.java b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/reactive/repository/AccountMongoRepositoryManualTest.java
new file mode 100644
index 0000000000..2ca075aa5e
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/reactive/repository/AccountMongoRepositoryManualTest.java
@@ -0,0 +1,67 @@
+package com.baeldung.reactive.repository;
+
+import com.baeldung.reactive.Spring5ReactiveApplication;
+import com.baeldung.reactive.model.Account;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.data.domain.Example;
+import org.springframework.data.domain.ExampleMatcher;
+import org.springframework.test.context.junit4.SpringRunner;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.springframework.data.domain.ExampleMatcher.GenericPropertyMatchers.startsWith;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = Spring5ReactiveApplication.class)
+public class AccountMongoRepositoryManualTest {
+
+ @Autowired
+ AccountMongoRepository repository;
+
+ @Test
+ public void givenExample_whenFindAllWithExample_thenFindAllMacthings() {
+ repository.save(new Account(null, "john", 12.3)).block();
+ ExampleMatcher matcher = ExampleMatcher.matching().withMatcher("owner", startsWith());
+ Example example = Example.of(new Account(null, "jo", null), matcher);
+ Flux accountFlux = repository.findAll(example);
+
+ StepVerifier
+ .create(accountFlux)
+ .assertNext(account -> assertEquals("john", account.getOwner()))
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ public void givenAccount_whenSave_thenSave() {
+ Mono accountMono = repository.save(new Account(null, "john", 12.3));
+
+ StepVerifier
+ .create(accountMono)
+ .assertNext(account -> assertNotNull(account.getId()))
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ public void givenId_whenFindById_thenFindAccount() {
+ Account inserted = repository.save(new Account(null, "john", 12.3)).block();
+ Mono accountMono = repository.findById(inserted.getId());
+
+ StepVerifier
+ .create(accountMono)
+ .assertNext(account -> {
+ assertEquals("john", account.getOwner());
+ assertEquals(Double.valueOf(12.3), account.getValue());
+ assertNotNull(account.getId());
+ })
+ .expectComplete()
+ .verify();
+ }
+}
\ No newline at end of file
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/reactive/repository/AccountRxJavaRepositoryManualTest.java b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/reactive/repository/AccountRxJavaRepositoryManualTest.java
new file mode 100644
index 0000000000..d91acd24e2
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/reactive/repository/AccountRxJavaRepositoryManualTest.java
@@ -0,0 +1,58 @@
+package com.baeldung.reactive.repository;
+
+import com.baeldung.reactive.Spring5ReactiveApplication;
+import com.baeldung.reactive.model.Account;
+import io.reactivex.Observable;
+import io.reactivex.Single;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = Spring5ReactiveApplication.class)
+public class AccountRxJavaRepositoryManualTest {
+
+ @Autowired
+ AccountRxJavaRepository repository;
+
+ @Test
+ public void givenValue_whenFindAllByValue_thenFindAccounts() throws InterruptedException {
+ repository.save(new Account(null, "bruno", 12.3)).blockingGet();
+ Observable accountObservable = repository.findAllByValue(12.3);
+
+ accountObservable
+ .test()
+ .await()
+ .assertComplete()
+ .assertValueAt(0, account -> {
+ assertEquals("bruno", account.getOwner());
+ assertEquals(Double.valueOf(12.3), account.getValue());
+ return true;
+ });
+
+ }
+
+ @Test
+ public void givenOwner_whenFindFirstByOwner_thenFindAccount() throws InterruptedException {
+ repository.save(new Account(null, "bruno", 12.3)).blockingGet();
+ Single accountSingle = repository.findFirstByOwner(Single.just("bruno"));
+
+ accountSingle
+ .test()
+ .await()
+ .assertComplete()
+ .assertValueAt(0, account -> {
+ assertEquals("bruno", account.getOwner());
+ assertEquals(Double.valueOf(12.3), account.getValue());
+ assertNotNull(account.getId());
+ return true;
+ });
+
+ }
+
+}
\ No newline at end of file
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/reactive/template/AccountTemplateOperationsManualTest.java b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/reactive/template/AccountTemplateOperationsManualTest.java
new file mode 100644
index 0000000000..5fa0e39317
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/reactive/template/AccountTemplateOperationsManualTest.java
@@ -0,0 +1,48 @@
+package com.baeldung.reactive.template;
+
+import com.baeldung.reactive.Spring5ReactiveApplication;
+import com.baeldung.reactive.model.Account;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = Spring5ReactiveApplication.class)
+public class AccountTemplateOperationsManualTest {
+
+ @Autowired
+ AccountTemplateOperations accountTemplate;
+
+ @Test
+ public void givenAccount_whenSave_thenSave() {
+ Account account = accountTemplate.save(Mono.just(new Account(null, "Raul", 12.3))).block();
+ assertNotNull( account.getId() );
+ }
+
+ @Test
+ public void givenId_whenFindById_thenFindAccount() {
+ Mono accountMono = accountTemplate.save(Mono.just(new Account(null, "Raul", 12.3)));
+ Mono accountMonoResult = accountTemplate.findById(accountMono.block().getId());
+ assertNotNull(accountMonoResult.block().getId());
+ assertEquals(accountMonoResult.block().getOwner(), "Raul");
+ }
+
+ @Test
+ public void whenFindAll_thenFindAllAccounts() {
+ Account account1 = accountTemplate.save(Mono.just(new Account(null, "Raul", 12.3))).block();
+ Account account2 = accountTemplate.save(Mono.just(new Account(null, "Raul Torres", 13.3))).block();
+ Flux accountFlux = accountTemplate.findAll();
+ List accounts = accountFlux.collectList().block();
+ assertTrue(accounts.stream().anyMatch(x -> account1.getId().equals(x.getId()) ));
+ assertTrue(accounts.stream().anyMatch(x -> account2.getId().equals(x.getId()) ));
+ }
+
+}
\ No newline at end of file
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/tailablecursor/service/ErrorLogsCounterManualTest.java b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/tailablecursor/service/ErrorLogsCounterManualTest.java
new file mode 100644
index 0000000000..5e20d3ec79
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/tailablecursor/service/ErrorLogsCounterManualTest.java
@@ -0,0 +1,112 @@
+package com.baeldung.tailablecursor.service;
+
+import com.baeldung.tailablecursor.domain.Log;
+import com.baeldung.tailablecursor.domain.LogLevel;
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.CreateCollectionOptions;
+import de.flapdoodle.embed.mongo.MongodExecutable;
+import de.flapdoodle.embed.mongo.MongodProcess;
+import de.flapdoodle.embed.mongo.MongodStarter;
+import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
+import de.flapdoodle.embed.mongo.config.Net;
+import de.flapdoodle.embed.mongo.distribution.Version;
+import de.flapdoodle.embed.process.runtime.Network;
+import org.bson.Document;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.util.SocketUtils;
+
+import java.io.IOException;
+import java.util.stream.IntStream;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+public class ErrorLogsCounterManualTest {
+
+ private static final String SERVER = "localhost";
+ private static final int PORT = SocketUtils.findAvailableTcpPort(10000);
+ private static final String DB_NAME = "test";
+ private static final String COLLECTION_NAME = Log.class.getName().toLowerCase();
+
+ private static final MongodStarter starter = MongodStarter.getDefaultInstance();
+ private static final int MAX_DOCUMENTS_IN_COLLECTION = 3;
+
+ private ErrorLogsCounter errorLogsCounter;
+ private MongodExecutable mongodExecutable;
+ private MongodProcess mongoDaemon;
+ private MongoDatabase db;
+
+ @Before
+ public void setup() throws Exception {
+ MongoTemplate mongoTemplate = initMongoTemplate();
+
+ MongoCollection collection = createCappedCollection();
+
+ persistDocument(collection, -1, LogLevel.ERROR, "my-service", "Initial log");
+
+ errorLogsCounter = new ErrorLogsCounter(mongoTemplate, COLLECTION_NAME);
+ Thread.sleep(1000L); // wait for initialization
+ }
+
+ private MongoTemplate initMongoTemplate() throws IOException {
+ mongodExecutable = starter.prepare(new MongodConfigBuilder()
+ .version(Version.Main.PRODUCTION)
+ .net(new Net(SERVER, PORT, Network.localhostIsIPv6()))
+ .build());
+ mongoDaemon = mongodExecutable.start();
+
+ MongoClient mongoClient = new MongoClient(SERVER, PORT);
+ db = mongoClient.getDatabase(DB_NAME);
+
+ return new MongoTemplate(mongoClient, DB_NAME);
+ }
+
+ private MongoCollection createCappedCollection() {
+ db.createCollection(COLLECTION_NAME, new CreateCollectionOptions()
+ .capped(true)
+ .sizeInBytes(100000)
+ .maxDocuments(MAX_DOCUMENTS_IN_COLLECTION));
+ return db.getCollection(COLLECTION_NAME);
+ }
+
+ private void persistDocument(MongoCollection collection,
+ int i, LogLevel level, String service, String message) {
+ Document logMessage = new Document();
+ logMessage.append("_id", i);
+ logMessage.append("level", level.toString());
+ logMessage.append("service", service);
+ logMessage.append("message", message);
+ collection.insertOne(logMessage);
+ }
+
+ @After
+ public void tearDown() {
+ errorLogsCounter.close();
+ mongoDaemon.stop();
+ mongodExecutable.stop();
+ }
+
+ @Test
+ public void whenErrorLogsArePersisted_thenTheyAreReceivedByLogsCounter() throws Exception {
+ MongoCollection collection = db.getCollection(COLLECTION_NAME);
+
+ IntStream.range(1, 10)
+ .forEach(i -> persistDocument(collection,
+ i,
+ i > 5 ? LogLevel.ERROR : LogLevel.INFO,
+ "service" + i,
+ "Message from service " + i)
+ );
+
+ Thread.sleep(1000L); // wait to receive all messages from the reactive mongodb driver
+
+ assertThat(collection.countDocuments(), is((long) MAX_DOCUMENTS_IN_COLLECTION));
+ assertThat(errorLogsCounter.count(), is(5));
+ }
+
+}
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/tailablecursor/service/InfoLogsCounterManualTest.java b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/tailablecursor/service/InfoLogsCounterManualTest.java
new file mode 100644
index 0000000000..cd8bd68257
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/tailablecursor/service/InfoLogsCounterManualTest.java
@@ -0,0 +1,75 @@
+package com.baeldung.tailablecursor.service;
+
+import com.baeldung.tailablecursor.LogsCounterApplication;
+import com.baeldung.tailablecursor.domain.Log;
+import com.baeldung.tailablecursor.domain.LogLevel;
+import com.baeldung.tailablecursor.repository.LogsRepository;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.data.mongodb.core.CollectionOptions;
+import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
+import org.springframework.test.context.junit4.SpringRunner;
+import reactor.core.publisher.Flux;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = LogsCounterApplication.class)
+@Slf4j
+public class InfoLogsCounterManualTest {
+ @Autowired
+ private LogsRepository repository;
+
+ @Autowired
+ private ReactiveMongoTemplate template;
+
+ @Before
+ public void setUp() {
+ createCappedCollectionUsingReactiveMongoTemplate(template);
+
+ persistDocument(Log.builder()
+ .level(LogLevel.INFO)
+ .service("Service 2")
+ .message("Initial INFO message")
+ .build());
+ }
+
+ private void createCappedCollectionUsingReactiveMongoTemplate(ReactiveMongoTemplate reactiveMongoTemplate) {
+ reactiveMongoTemplate.dropCollection(Log.class).block();
+ reactiveMongoTemplate.createCollection(Log.class, CollectionOptions.empty()
+ .maxDocuments(5)
+ .size(1024 * 1024L)
+ .capped()).block();
+ }
+
+ private void persistDocument(Log log) {
+ repository.save(log).block();
+ }
+
+ @Test
+ public void wheInfoLogsArePersisted_thenTheyAreReceivedByLogsCounter() throws Exception {
+ InfoLogsCounter infoLogsCounter = new InfoLogsCounter(repository);
+
+ Thread.sleep(1000L); // wait for initialization
+
+ Flux.range(0,10)
+ .map(i -> Log.builder()
+ .level(i > 5 ? LogLevel.WARN : LogLevel.INFO)
+ .service("some-service")
+ .message("some log message")
+ .build())
+ .map(entity -> repository.save(entity).subscribe())
+ .blockLast();
+
+ Thread.sleep(1000L); // wait to receive all messages from the reactive mongodb driver
+
+ assertThat(infoLogsCounter.count(), is(7));
+ infoLogsCounter.close();
+ }
+}
\ No newline at end of file
diff --git a/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/tailablecursor/service/WarnLogsCounterManualTest.java b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/tailablecursor/service/WarnLogsCounterManualTest.java
new file mode 100644
index 0000000000..79d94b6784
--- /dev/null
+++ b/persistence-modules/spring-data-mongodb-reactive/src/test/java/com/baeldung/tailablecursor/service/WarnLogsCounterManualTest.java
@@ -0,0 +1,75 @@
+package com.baeldung.tailablecursor.service;
+
+import com.baeldung.tailablecursor.LogsCounterApplication;
+import com.baeldung.tailablecursor.domain.Log;
+import com.baeldung.tailablecursor.domain.LogLevel;
+import com.baeldung.tailablecursor.repository.LogsRepository;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.data.mongodb.core.CollectionOptions;
+import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
+import org.springframework.test.context.junit4.SpringRunner;
+import reactor.core.publisher.Flux;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = LogsCounterApplication.class)
+@Slf4j
+public class WarnLogsCounterManualTest {
+ @Autowired
+ private LogsRepository repository;
+
+ @Autowired
+ private ReactiveMongoTemplate template;
+
+ @Before
+ public void setUp() {
+ createCappedCollectionUsingReactiveMongoTemplate(template);
+
+ persistDocument(Log.builder()
+ .level(LogLevel.WARN)
+ .service("Service 1")
+ .message("Initial Warn message")
+ .build());
+ }
+
+ private void createCappedCollectionUsingReactiveMongoTemplate(ReactiveMongoTemplate reactiveMongoTemplate) {
+ reactiveMongoTemplate.dropCollection(Log.class).block();
+ reactiveMongoTemplate.createCollection(Log.class, CollectionOptions.empty()
+ .maxDocuments(5)
+ .size(1024 * 1024L)
+ .capped()).block();
+ }
+
+ private void persistDocument(Log log) {
+ repository.save(log).block();
+ }
+
+ @Test
+ public void whenWarnLogsArePersisted_thenTheyAreReceivedByLogsCounter() throws Exception {
+ WarnLogsCounter warnLogsCounter = new WarnLogsCounter(template);
+
+ Thread.sleep(1000L); // wait for initialization
+
+ Flux.range(0,10)
+ .map(i -> Log.builder()
+ .level(i > 5 ? LogLevel.WARN : LogLevel.INFO)
+ .service("some-service")
+ .message("some log message")
+ .build())
+ .map(entity -> repository.save(entity).subscribe())
+ .blockLast();
+
+ Thread.sleep(1000L); // wait to receive all messages from the reactive mongodb driver
+
+ assertThat(warnLogsCounter.count(), is(5));
+ warnLogsCounter.close();
+ }
+}
\ No newline at end of file
diff --git a/spring-5-data-reactive/pom.xml b/spring-5-data-reactive/pom.xml
index 5f12636280..94a3c47809 100644
--- a/spring-5-data-reactive/pom.xml
+++ b/spring-5-data-reactive/pom.xml
@@ -83,11 +83,6 @@
h2
${h2.version}
-
- org.springframework.boot
- spring-boot-starter-test
- test
-
org.apache.httpcomponents
httpclient
@@ -152,6 +147,10 @@
1.4.200
1.5.23
3.3.1.RELEASE
+
2.2.6.RELEASE