diff --git a/spring-5-data-reactive/pom.xml b/spring-5-data-reactive/pom.xml
index aa73cf11ae..8c16851de0 100644
--- a/spring-5-data-reactive/pom.xml
+++ b/spring-5-data-reactive/pom.xml
@@ -63,6 +63,11 @@
spring-boot-starter-test
test
+
+ de.flapdoodle.embed
+ de.flapdoodle.embed.mongo
+ test
+
diff --git a/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/LogsCounterApplication.java b/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/LogsCounterApplication.java
new file mode 100644
index 0000000000..8b2511a8f3
--- /dev/null
+++ b/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/LogsCounterApplication.java
@@ -0,0 +1,11 @@
+package com.baeldung.tailablecursor;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class LogsCounterApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(LogsCounterApplication.class, args);
+ }
+}
diff --git a/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/domain/Log.java b/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/domain/Log.java
new file mode 100644
index 0000000000..717a367751
--- /dev/null
+++ b/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/domain/Log.java
@@ -0,0 +1,21 @@
+package com.baeldung.tailablecursor.domain;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+@Data
+@Document
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class Log {
+ @Id
+ private String id;
+ private String service;
+ private LogLevel level;
+ private String message;
+}
diff --git a/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/domain/LogLevel.java b/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/domain/LogLevel.java
new file mode 100644
index 0000000000..6826fbffd3
--- /dev/null
+++ b/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/domain/LogLevel.java
@@ -0,0 +1,5 @@
+package com.baeldung.tailablecursor.domain;
+
+public enum LogLevel {
+ ERROR, WARN, INFO
+}
diff --git a/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/repository/LogsRepository.java b/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/repository/LogsRepository.java
new file mode 100644
index 0000000000..dce11c548c
--- /dev/null
+++ b/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/repository/LogsRepository.java
@@ -0,0 +1,12 @@
+package com.baeldung.tailablecursor.repository;
+
+import com.baeldung.tailablecursor.domain.Log;
+import com.baeldung.tailablecursor.domain.LogLevel;
+import org.springframework.data.mongodb.repository.Tailable;
+import org.springframework.data.repository.reactive.ReactiveCrudRepository;
+import reactor.core.publisher.Flux;
+
+public interface LogsRepository extends ReactiveCrudRepository {
+ @Tailable
+ Flux findByLevel(LogLevel level);
+}
diff --git a/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/service/ErrorLogsCounter.java b/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/service/ErrorLogsCounter.java
new file mode 100644
index 0000000000..c243e64f97
--- /dev/null
+++ b/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/service/ErrorLogsCounter.java
@@ -0,0 +1,62 @@
+package com.baeldung.tailablecursor.service;
+
+import com.baeldung.tailablecursor.domain.Log;
+import com.baeldung.tailablecursor.domain.LogLevel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.data.mongodb.core.mapping.Document;
+import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
+import org.springframework.data.mongodb.core.messaging.MessageListener;
+import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
+import org.springframework.data.mongodb.core.messaging.TailableCursorRequest;
+
+import javax.annotation.PreDestroy;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.springframework.data.mongodb.core.query.Criteria.where;
+import static org.springframework.data.mongodb.core.query.Query.query;
+
+@Slf4j
+public class ErrorLogsCounter implements LogsCounter {
+
+ private static final String LEVEL_FIELD_NAME = "level";
+
+ private final String collectionName;
+ private final MessageListenerContainer container;
+
+ private final AtomicInteger counter = new AtomicInteger();
+
+ public ErrorLogsCounter(MongoTemplate mongoTemplate,
+ String collectionName) {
+ this.collectionName = collectionName;
+ this.container = new DefaultMessageListenerContainer(mongoTemplate);
+
+ container.start();
+ TailableCursorRequest request = getTailableCursorRequest();
+ container.register(request, Log.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ private TailableCursorRequest getTailableCursorRequest() {
+ MessageListener listener = message -> {
+ log.info("ERROR log received: {}", message.getBody());
+ counter.incrementAndGet();
+ };
+
+ return TailableCursorRequest.builder()
+ .collection(collectionName)
+ .filter(query(where(LEVEL_FIELD_NAME).is(LogLevel.ERROR)))
+ .publishTo(listener)
+ .build();
+ }
+
+ @Override
+ public int count() {
+ return counter.get();
+ }
+
+ @PreDestroy
+ public void close() {
+ container.stop();
+ }
+}
diff --git a/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/service/InfoLogsCounter.java b/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/service/InfoLogsCounter.java
new file mode 100644
index 0000000000..b30eba0b25
--- /dev/null
+++ b/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/service/InfoLogsCounter.java
@@ -0,0 +1,36 @@
+package com.baeldung.tailablecursor.service;
+
+import com.baeldung.tailablecursor.domain.Log;
+import com.baeldung.tailablecursor.domain.LogLevel;
+import com.baeldung.tailablecursor.repository.LogsRepository;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+
+import javax.annotation.PreDestroy;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+public class InfoLogsCounter implements LogsCounter {
+
+ private final AtomicInteger counter = new AtomicInteger();
+ private final Disposable subscription;
+
+ public InfoLogsCounter(LogsRepository repository) {
+ Flux stream = repository.findByLevel(LogLevel.INFO);
+ this.subscription = stream.subscribe(l -> {
+ log.info("INFO log received: " + l);
+ counter.incrementAndGet();
+ });
+ }
+
+ @Override
+ public int count() {
+ return this.counter.get();
+ }
+
+ @PreDestroy
+ public void close() {
+ this.subscription.dispose();
+ }
+}
diff --git a/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/service/LogsCounter.java b/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/service/LogsCounter.java
new file mode 100644
index 0000000000..e14a3eadd7
--- /dev/null
+++ b/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/service/LogsCounter.java
@@ -0,0 +1,5 @@
+package com.baeldung.tailablecursor.service;
+
+public interface LogsCounter {
+ int count();
+}
diff --git a/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/service/WarnLogsCounter.java b/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/service/WarnLogsCounter.java
new file mode 100644
index 0000000000..b21f61fa88
--- /dev/null
+++ b/spring-5-data-reactive/src/main/java/com/baeldung/tailablecursor/service/WarnLogsCounter.java
@@ -0,0 +1,41 @@
+package com.baeldung.tailablecursor.service;
+
+import com.baeldung.tailablecursor.domain.Log;
+import com.baeldung.tailablecursor.domain.LogLevel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+
+import javax.annotation.PreDestroy;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.springframework.data.mongodb.core.query.Criteria.where;
+import static org.springframework.data.mongodb.core.query.Query.query;
+
+@Slf4j
+public class WarnLogsCounter implements LogsCounter {
+
+ private static final String LEVEL_FIELD_NAME = "level";
+
+ private final AtomicInteger counter = new AtomicInteger();
+ private final Disposable subscription;
+
+ public WarnLogsCounter(ReactiveMongoTemplate template) {
+ Flux stream = template.tail(query(where(LEVEL_FIELD_NAME).is(LogLevel.WARN)), Log.class);
+ subscription = stream.subscribe(l -> {
+ log.warn("WARN log received: " + l);
+ counter.incrementAndGet();
+ });
+ }
+
+ @Override
+ public int count() {
+ return counter.get();
+ }
+
+ @PreDestroy
+ public void close() {
+ subscription.dispose();
+ }
+}
diff --git a/spring-5-data-reactive/src/test/java/com/baeldung/tailablecursor/service/ErrorLogsCounterManualTest.java b/spring-5-data-reactive/src/test/java/com/baeldung/tailablecursor/service/ErrorLogsCounterManualTest.java
new file mode 100644
index 0000000000..5e20d3ec79
--- /dev/null
+++ b/spring-5-data-reactive/src/test/java/com/baeldung/tailablecursor/service/ErrorLogsCounterManualTest.java
@@ -0,0 +1,112 @@
+package com.baeldung.tailablecursor.service;
+
+import com.baeldung.tailablecursor.domain.Log;
+import com.baeldung.tailablecursor.domain.LogLevel;
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.CreateCollectionOptions;
+import de.flapdoodle.embed.mongo.MongodExecutable;
+import de.flapdoodle.embed.mongo.MongodProcess;
+import de.flapdoodle.embed.mongo.MongodStarter;
+import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
+import de.flapdoodle.embed.mongo.config.Net;
+import de.flapdoodle.embed.mongo.distribution.Version;
+import de.flapdoodle.embed.process.runtime.Network;
+import org.bson.Document;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.util.SocketUtils;
+
+import java.io.IOException;
+import java.util.stream.IntStream;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+public class ErrorLogsCounterManualTest {
+
+ private static final String SERVER = "localhost";
+ private static final int PORT = SocketUtils.findAvailableTcpPort(10000);
+ private static final String DB_NAME = "test";
+ private static final String COLLECTION_NAME = Log.class.getName().toLowerCase();
+
+ private static final MongodStarter starter = MongodStarter.getDefaultInstance();
+ private static final int MAX_DOCUMENTS_IN_COLLECTION = 3;
+
+ private ErrorLogsCounter errorLogsCounter;
+ private MongodExecutable mongodExecutable;
+ private MongodProcess mongoDaemon;
+ private MongoDatabase db;
+
+ @Before
+ public void setup() throws Exception {
+ MongoTemplate mongoTemplate = initMongoTemplate();
+
+ MongoCollection collection = createCappedCollection();
+
+ persistDocument(collection, -1, LogLevel.ERROR, "my-service", "Initial log");
+
+ errorLogsCounter = new ErrorLogsCounter(mongoTemplate, COLLECTION_NAME);
+ Thread.sleep(1000L); // wait for initialization
+ }
+
+ private MongoTemplate initMongoTemplate() throws IOException {
+ mongodExecutable = starter.prepare(new MongodConfigBuilder()
+ .version(Version.Main.PRODUCTION)
+ .net(new Net(SERVER, PORT, Network.localhostIsIPv6()))
+ .build());
+ mongoDaemon = mongodExecutable.start();
+
+ MongoClient mongoClient = new MongoClient(SERVER, PORT);
+ db = mongoClient.getDatabase(DB_NAME);
+
+ return new MongoTemplate(mongoClient, DB_NAME);
+ }
+
+ private MongoCollection createCappedCollection() {
+ db.createCollection(COLLECTION_NAME, new CreateCollectionOptions()
+ .capped(true)
+ .sizeInBytes(100000)
+ .maxDocuments(MAX_DOCUMENTS_IN_COLLECTION));
+ return db.getCollection(COLLECTION_NAME);
+ }
+
+ private void persistDocument(MongoCollection collection,
+ int i, LogLevel level, String service, String message) {
+ Document logMessage = new Document();
+ logMessage.append("_id", i);
+ logMessage.append("level", level.toString());
+ logMessage.append("service", service);
+ logMessage.append("message", message);
+ collection.insertOne(logMessage);
+ }
+
+ @After
+ public void tearDown() {
+ errorLogsCounter.close();
+ mongoDaemon.stop();
+ mongodExecutable.stop();
+ }
+
+ @Test
+ public void whenErrorLogsArePersisted_thenTheyAreReceivedByLogsCounter() throws Exception {
+ MongoCollection collection = db.getCollection(COLLECTION_NAME);
+
+ IntStream.range(1, 10)
+ .forEach(i -> persistDocument(collection,
+ i,
+ i > 5 ? LogLevel.ERROR : LogLevel.INFO,
+ "service" + i,
+ "Message from service " + i)
+ );
+
+ Thread.sleep(1000L); // wait to receive all messages from the reactive mongodb driver
+
+ assertThat(collection.countDocuments(), is((long) MAX_DOCUMENTS_IN_COLLECTION));
+ assertThat(errorLogsCounter.count(), is(5));
+ }
+
+}
diff --git a/spring-5-data-reactive/src/test/java/com/baeldung/tailablecursor/service/InfoLogsCounterManualTest.java b/spring-5-data-reactive/src/test/java/com/baeldung/tailablecursor/service/InfoLogsCounterManualTest.java
new file mode 100644
index 0000000000..cd8bd68257
--- /dev/null
+++ b/spring-5-data-reactive/src/test/java/com/baeldung/tailablecursor/service/InfoLogsCounterManualTest.java
@@ -0,0 +1,75 @@
+package com.baeldung.tailablecursor.service;
+
+import com.baeldung.tailablecursor.LogsCounterApplication;
+import com.baeldung.tailablecursor.domain.Log;
+import com.baeldung.tailablecursor.domain.LogLevel;
+import com.baeldung.tailablecursor.repository.LogsRepository;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.data.mongodb.core.CollectionOptions;
+import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
+import org.springframework.test.context.junit4.SpringRunner;
+import reactor.core.publisher.Flux;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = LogsCounterApplication.class)
+@Slf4j
+public class InfoLogsCounterManualTest {
+ @Autowired
+ private LogsRepository repository;
+
+ @Autowired
+ private ReactiveMongoTemplate template;
+
+ @Before
+ public void setUp() {
+ createCappedCollectionUsingReactiveMongoTemplate(template);
+
+ persistDocument(Log.builder()
+ .level(LogLevel.INFO)
+ .service("Service 2")
+ .message("Initial INFO message")
+ .build());
+ }
+
+ private void createCappedCollectionUsingReactiveMongoTemplate(ReactiveMongoTemplate reactiveMongoTemplate) {
+ reactiveMongoTemplate.dropCollection(Log.class).block();
+ reactiveMongoTemplate.createCollection(Log.class, CollectionOptions.empty()
+ .maxDocuments(5)
+ .size(1024 * 1024L)
+ .capped()).block();
+ }
+
+ private void persistDocument(Log log) {
+ repository.save(log).block();
+ }
+
+ @Test
+ public void wheInfoLogsArePersisted_thenTheyAreReceivedByLogsCounter() throws Exception {
+ InfoLogsCounter infoLogsCounter = new InfoLogsCounter(repository);
+
+ Thread.sleep(1000L); // wait for initialization
+
+ Flux.range(0,10)
+ .map(i -> Log.builder()
+ .level(i > 5 ? LogLevel.WARN : LogLevel.INFO)
+ .service("some-service")
+ .message("some log message")
+ .build())
+ .map(entity -> repository.save(entity).subscribe())
+ .blockLast();
+
+ Thread.sleep(1000L); // wait to receive all messages from the reactive mongodb driver
+
+ assertThat(infoLogsCounter.count(), is(7));
+ infoLogsCounter.close();
+ }
+}
\ No newline at end of file
diff --git a/spring-5-data-reactive/src/test/java/com/baeldung/tailablecursor/service/WarnLogsCounterManualTest.java b/spring-5-data-reactive/src/test/java/com/baeldung/tailablecursor/service/WarnLogsCounterManualTest.java
new file mode 100644
index 0000000000..79d94b6784
--- /dev/null
+++ b/spring-5-data-reactive/src/test/java/com/baeldung/tailablecursor/service/WarnLogsCounterManualTest.java
@@ -0,0 +1,75 @@
+package com.baeldung.tailablecursor.service;
+
+import com.baeldung.tailablecursor.LogsCounterApplication;
+import com.baeldung.tailablecursor.domain.Log;
+import com.baeldung.tailablecursor.domain.LogLevel;
+import com.baeldung.tailablecursor.repository.LogsRepository;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.data.mongodb.core.CollectionOptions;
+import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
+import org.springframework.test.context.junit4.SpringRunner;
+import reactor.core.publisher.Flux;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = LogsCounterApplication.class)
+@Slf4j
+public class WarnLogsCounterManualTest {
+ @Autowired
+ private LogsRepository repository;
+
+ @Autowired
+ private ReactiveMongoTemplate template;
+
+ @Before
+ public void setUp() {
+ createCappedCollectionUsingReactiveMongoTemplate(template);
+
+ persistDocument(Log.builder()
+ .level(LogLevel.WARN)
+ .service("Service 1")
+ .message("Initial Warn message")
+ .build());
+ }
+
+ private void createCappedCollectionUsingReactiveMongoTemplate(ReactiveMongoTemplate reactiveMongoTemplate) {
+ reactiveMongoTemplate.dropCollection(Log.class).block();
+ reactiveMongoTemplate.createCollection(Log.class, CollectionOptions.empty()
+ .maxDocuments(5)
+ .size(1024 * 1024L)
+ .capped()).block();
+ }
+
+ private void persistDocument(Log log) {
+ repository.save(log).block();
+ }
+
+ @Test
+ public void whenWarnLogsArePersisted_thenTheyAreReceivedByLogsCounter() throws Exception {
+ WarnLogsCounter warnLogsCounter = new WarnLogsCounter(template);
+
+ Thread.sleep(1000L); // wait for initialization
+
+ Flux.range(0,10)
+ .map(i -> Log.builder()
+ .level(i > 5 ? LogLevel.WARN : LogLevel.INFO)
+ .service("some-service")
+ .message("some log message")
+ .build())
+ .map(entity -> repository.save(entity).subscribe())
+ .blockLast();
+
+ Thread.sleep(1000L); // wait to receive all messages from the reactive mongodb driver
+
+ assertThat(warnLogsCounter.count(), is(5));
+ warnLogsCounter.close();
+ }
+}
\ No newline at end of file