JAVA-6511 - Removed Mongo DB Reactive and Mongo DB Tail cursor.
This commit is contained in:
parent
77a32b7f37
commit
e70f7829af
|
@ -24,10 +24,6 @@
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>spring-boot-starter-data-couchbase-reactive</artifactId>
|
<artifactId>spring-boot-starter-data-couchbase-reactive</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.springframework.boot</groupId>
|
|
||||||
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>spring-boot-starter-web</artifactId>
|
<artifactId>spring-boot-starter-web</artifactId>
|
||||||
|
@ -54,11 +50,6 @@
|
||||||
<artifactId>spring-boot-starter-test</artifactId>
|
<artifactId>spring-boot-starter-test</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>de.flapdoodle.embed</groupId>
|
|
||||||
<artifactId>de.flapdoodle.embed.mongo</artifactId>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>spring-boot-starter-webflux</artifactId>
|
<artifactId>spring-boot-starter-webflux</artifactId>
|
||||||
|
|
|
@ -1,25 +0,0 @@
|
||||||
package com.baeldung.reactive;
|
|
||||||
|
|
||||||
import com.mongodb.reactivestreams.client.MongoClient;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.boot.SpringApplication;
|
|
||||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
|
|
||||||
|
|
||||||
@SpringBootApplication
|
|
||||||
public class Spring5ReactiveApplication{
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
SpringApplication.run(Spring5ReactiveApplication.class, args);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
MongoClient mongoClient;
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public ReactiveMongoTemplate reactiveMongoTemplate() {
|
|
||||||
return new ReactiveMongoTemplate(mongoClient, "test");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,21 +0,0 @@
|
||||||
package com.baeldung.reactive.model;
|
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
import lombok.ToString;
|
|
||||||
import org.springframework.data.annotation.Id;
|
|
||||||
import org.springframework.data.mongodb.core.mapping.Document;
|
|
||||||
|
|
||||||
@Document
|
|
||||||
@Data
|
|
||||||
@ToString
|
|
||||||
@AllArgsConstructor
|
|
||||||
@NoArgsConstructor
|
|
||||||
public class Account {
|
|
||||||
|
|
||||||
@Id
|
|
||||||
private String id;
|
|
||||||
private String owner;
|
|
||||||
private Double value;
|
|
||||||
}
|
|
|
@ -1,15 +0,0 @@
|
||||||
package com.baeldung.reactive.repository;
|
|
||||||
|
|
||||||
import com.baeldung.reactive.model.Account;
|
|
||||||
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
|
|
||||||
import org.springframework.stereotype.Repository;
|
|
||||||
import reactor.core.publisher.Flux;
|
|
||||||
import reactor.core.publisher.Mono;
|
|
||||||
|
|
||||||
@Repository
|
|
||||||
public interface AccountCrudRepository extends ReactiveCrudRepository<Account, String> {
|
|
||||||
|
|
||||||
public Flux<Account> findAllByValue(Double value);
|
|
||||||
|
|
||||||
public Mono<Account> findFirstByOwner(Mono<String> owner);
|
|
||||||
}
|
|
|
@ -1,7 +0,0 @@
|
||||||
package com.baeldung.reactive.repository;
|
|
||||||
|
|
||||||
import com.baeldung.reactive.model.Account;
|
|
||||||
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
|
|
||||||
|
|
||||||
public interface AccountMongoRepository extends ReactiveMongoRepository<Account, String> {
|
|
||||||
}
|
|
|
@ -1,15 +0,0 @@
|
||||||
package com.baeldung.reactive.repository;
|
|
||||||
|
|
||||||
import com.baeldung.reactive.model.Account;
|
|
||||||
import io.reactivex.Observable;
|
|
||||||
import io.reactivex.Single;
|
|
||||||
import org.springframework.data.repository.reactive.RxJava2CrudRepository;
|
|
||||||
import org.springframework.stereotype.Repository;
|
|
||||||
|
|
||||||
@Repository
|
|
||||||
public interface AccountRxJavaRepository extends RxJava2CrudRepository<Account, String>{
|
|
||||||
|
|
||||||
public Observable<Account> findAllByValue(Double value);
|
|
||||||
|
|
||||||
public Single<Account> findFirstByOwner(Single<String> owner);
|
|
||||||
}
|
|
|
@ -1,33 +0,0 @@
|
||||||
package com.baeldung.reactive.template;
|
|
||||||
|
|
||||||
import com.baeldung.reactive.model.Account;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
|
|
||||||
import org.springframework.data.mongodb.core.ReactiveRemoveOperation;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
import reactor.core.publisher.Flux;
|
|
||||||
import reactor.core.publisher.Mono;
|
|
||||||
|
|
||||||
@Service
|
|
||||||
public class AccountTemplateOperations {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
ReactiveMongoTemplate template;
|
|
||||||
|
|
||||||
public Mono<Account> findById(String id) {
|
|
||||||
return template.findById(id, Account.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Flux<Account> findAll() {
|
|
||||||
return template.findAll(Account.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Mono<Account> save(Mono<Account> account) {
|
|
||||||
return template.save(account);
|
|
||||||
}
|
|
||||||
|
|
||||||
public ReactiveRemoveOperation.ReactiveRemove<Account> deleteAll() {
|
|
||||||
return template.remove(Account.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,11 +0,0 @@
|
||||||
package com.baeldung.tailablecursor;
|
|
||||||
|
|
||||||
import org.springframework.boot.SpringApplication;
|
|
||||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|
||||||
|
|
||||||
@SpringBootApplication
|
|
||||||
public class LogsCounterApplication {
|
|
||||||
public static void main(String[] args) {
|
|
||||||
SpringApplication.run(LogsCounterApplication.class, args);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,21 +0,0 @@
|
||||||
package com.baeldung.tailablecursor.domain;
|
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Builder;
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
import org.springframework.data.annotation.Id;
|
|
||||||
import org.springframework.data.mongodb.core.mapping.Document;
|
|
||||||
|
|
||||||
@Data
|
|
||||||
@Document
|
|
||||||
@Builder
|
|
||||||
@NoArgsConstructor
|
|
||||||
@AllArgsConstructor
|
|
||||||
public class Log {
|
|
||||||
@Id
|
|
||||||
private String id;
|
|
||||||
private String service;
|
|
||||||
private LogLevel level;
|
|
||||||
private String message;
|
|
||||||
}
|
|
|
@ -1,5 +0,0 @@
|
||||||
package com.baeldung.tailablecursor.domain;
|
|
||||||
|
|
||||||
public enum LogLevel {
|
|
||||||
ERROR, WARN, INFO
|
|
||||||
}
|
|
|
@ -1,12 +0,0 @@
|
||||||
package com.baeldung.tailablecursor.repository;
|
|
||||||
|
|
||||||
import com.baeldung.tailablecursor.domain.Log;
|
|
||||||
import com.baeldung.tailablecursor.domain.LogLevel;
|
|
||||||
import org.springframework.data.mongodb.repository.Tailable;
|
|
||||||
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
|
|
||||||
import reactor.core.publisher.Flux;
|
|
||||||
|
|
||||||
public interface LogsRepository extends ReactiveCrudRepository<Log, String> {
|
|
||||||
@Tailable
|
|
||||||
Flux<Log> findByLevel(LogLevel level);
|
|
||||||
}
|
|
|
@ -1,62 +0,0 @@
|
||||||
package com.baeldung.tailablecursor.service;
|
|
||||||
|
|
||||||
import com.baeldung.tailablecursor.domain.Log;
|
|
||||||
import com.baeldung.tailablecursor.domain.LogLevel;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.data.mongodb.core.MongoTemplate;
|
|
||||||
import org.springframework.data.mongodb.core.mapping.Document;
|
|
||||||
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
|
|
||||||
import org.springframework.data.mongodb.core.messaging.MessageListener;
|
|
||||||
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
|
|
||||||
import org.springframework.data.mongodb.core.messaging.TailableCursorRequest;
|
|
||||||
|
|
||||||
import javax.annotation.PreDestroy;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import static org.springframework.data.mongodb.core.query.Criteria.where;
|
|
||||||
import static org.springframework.data.mongodb.core.query.Query.query;
|
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
public class ErrorLogsCounter implements LogsCounter {
|
|
||||||
|
|
||||||
private static final String LEVEL_FIELD_NAME = "level";
|
|
||||||
|
|
||||||
private final String collectionName;
|
|
||||||
private final MessageListenerContainer container;
|
|
||||||
|
|
||||||
private final AtomicInteger counter = new AtomicInteger();
|
|
||||||
|
|
||||||
public ErrorLogsCounter(MongoTemplate mongoTemplate,
|
|
||||||
String collectionName) {
|
|
||||||
this.collectionName = collectionName;
|
|
||||||
this.container = new DefaultMessageListenerContainer(mongoTemplate);
|
|
||||||
|
|
||||||
container.start();
|
|
||||||
TailableCursorRequest<Log> request = getTailableCursorRequest();
|
|
||||||
container.register(request, Log.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private TailableCursorRequest<Log> getTailableCursorRequest() {
|
|
||||||
MessageListener<Document, Log> listener = message -> {
|
|
||||||
log.info("ERROR log received: {}", message.getBody());
|
|
||||||
counter.incrementAndGet();
|
|
||||||
};
|
|
||||||
|
|
||||||
return TailableCursorRequest.builder()
|
|
||||||
.collection(collectionName)
|
|
||||||
.filter(query(where(LEVEL_FIELD_NAME).is(LogLevel.ERROR)))
|
|
||||||
.publishTo(listener)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int count() {
|
|
||||||
return counter.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
@PreDestroy
|
|
||||||
public void close() {
|
|
||||||
container.stop();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,36 +0,0 @@
|
||||||
package com.baeldung.tailablecursor.service;
|
|
||||||
|
|
||||||
import com.baeldung.tailablecursor.domain.Log;
|
|
||||||
import com.baeldung.tailablecursor.domain.LogLevel;
|
|
||||||
import com.baeldung.tailablecursor.repository.LogsRepository;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import reactor.core.Disposable;
|
|
||||||
import reactor.core.publisher.Flux;
|
|
||||||
|
|
||||||
import javax.annotation.PreDestroy;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
public class InfoLogsCounter implements LogsCounter {
|
|
||||||
|
|
||||||
private final AtomicInteger counter = new AtomicInteger();
|
|
||||||
private final Disposable subscription;
|
|
||||||
|
|
||||||
public InfoLogsCounter(LogsRepository repository) {
|
|
||||||
Flux<Log> stream = repository.findByLevel(LogLevel.INFO);
|
|
||||||
this.subscription = stream.subscribe(logEntity -> {
|
|
||||||
log.info("INFO log received: " + logEntity);
|
|
||||||
counter.incrementAndGet();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int count() {
|
|
||||||
return this.counter.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
@PreDestroy
|
|
||||||
public void close() {
|
|
||||||
this.subscription.dispose();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,5 +0,0 @@
|
||||||
package com.baeldung.tailablecursor.service;
|
|
||||||
|
|
||||||
public interface LogsCounter {
|
|
||||||
int count();
|
|
||||||
}
|
|
|
@ -1,41 +0,0 @@
|
||||||
package com.baeldung.tailablecursor.service;
|
|
||||||
|
|
||||||
import com.baeldung.tailablecursor.domain.Log;
|
|
||||||
import com.baeldung.tailablecursor.domain.LogLevel;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
|
|
||||||
import reactor.core.Disposable;
|
|
||||||
import reactor.core.publisher.Flux;
|
|
||||||
|
|
||||||
import javax.annotation.PreDestroy;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import static org.springframework.data.mongodb.core.query.Criteria.where;
|
|
||||||
import static org.springframework.data.mongodb.core.query.Query.query;
|
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
public class WarnLogsCounter implements LogsCounter {
|
|
||||||
|
|
||||||
private static final String LEVEL_FIELD_NAME = "level";
|
|
||||||
|
|
||||||
private final AtomicInteger counter = new AtomicInteger();
|
|
||||||
private final Disposable subscription;
|
|
||||||
|
|
||||||
public WarnLogsCounter(ReactiveMongoOperations template) {
|
|
||||||
Flux<Log> stream = template.tail(query(where(LEVEL_FIELD_NAME).is(LogLevel.WARN)), Log.class);
|
|
||||||
subscription = stream.subscribe(logEntity -> {
|
|
||||||
log.warn("WARN log received: " + logEntity);
|
|
||||||
counter.incrementAndGet();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int count() {
|
|
||||||
return counter.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
@PreDestroy
|
|
||||||
public void close() {
|
|
||||||
subscription.dispose();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,17 +0,0 @@
|
||||||
package com.baeldung;
|
|
||||||
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
|
||||||
import org.springframework.test.context.junit4.SpringRunner;
|
|
||||||
|
|
||||||
import com.baeldung.reactive.Spring5ReactiveApplication;
|
|
||||||
|
|
||||||
@RunWith(SpringRunner.class)
|
|
||||||
@SpringBootTest(classes = Spring5ReactiveApplication.class)
|
|
||||||
public class SpringContextTest {
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void whenSpringContextIsBootstrapped_thenNoExceptions() {
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,70 +0,0 @@
|
||||||
package com.baeldung.reactive.repository;
|
|
||||||
|
|
||||||
|
|
||||||
import com.baeldung.reactive.Spring5ReactiveApplication;
|
|
||||||
import com.baeldung.reactive.model.Account;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
|
||||||
import org.springframework.test.context.junit4.SpringRunner;
|
|
||||||
import reactor.core.publisher.Flux;
|
|
||||||
import reactor.core.publisher.Mono;
|
|
||||||
import reactor.test.StepVerifier;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
|
|
||||||
@RunWith(SpringRunner.class)
|
|
||||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = Spring5ReactiveApplication.class)
|
|
||||||
public class AccountCrudRepositoryManualTest {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
AccountCrudRepository repository;
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void givenValue_whenFindAllByValue_thenFindAccount() {
|
|
||||||
repository.save(new Account(null, "Bill", 12.3)).block();
|
|
||||||
Flux<Account> accountFlux = repository.findAllByValue(12.3);
|
|
||||||
|
|
||||||
StepVerifier.create(accountFlux)
|
|
||||||
.assertNext(account -> {
|
|
||||||
assertEquals("Bill", account.getOwner());
|
|
||||||
assertEquals(Double.valueOf(12.3) , account.getValue());
|
|
||||||
assertNotNull(account.getId());
|
|
||||||
})
|
|
||||||
.expectComplete()
|
|
||||||
.verify();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void givenOwner_whenFindFirstByOwner_thenFindAccount() {
|
|
||||||
repository.save(new Account(null, "Bill", 12.3)).block();
|
|
||||||
Mono<Account> accountMono = repository.findFirstByOwner(Mono.just("Bill"));
|
|
||||||
|
|
||||||
StepVerifier.create(accountMono)
|
|
||||||
.assertNext(account -> {
|
|
||||||
assertEquals("Bill", account.getOwner());
|
|
||||||
assertEquals(Double.valueOf(12.3) , account.getValue());
|
|
||||||
assertNotNull(account.getId());
|
|
||||||
})
|
|
||||||
.expectComplete()
|
|
||||||
.verify();
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void givenAccount_whenSave_thenSaveAccount() {
|
|
||||||
Mono<Account> accountMono = repository.save(new Account(null, "Bill", 12.3));
|
|
||||||
|
|
||||||
StepVerifier
|
|
||||||
.create(accountMono)
|
|
||||||
.assertNext(account -> assertNotNull(account.getId()))
|
|
||||||
.expectComplete()
|
|
||||||
.verify();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,67 +0,0 @@
|
||||||
package com.baeldung.reactive.repository;
|
|
||||||
|
|
||||||
import com.baeldung.reactive.Spring5ReactiveApplication;
|
|
||||||
import com.baeldung.reactive.model.Account;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
|
||||||
import org.springframework.data.domain.Example;
|
|
||||||
import org.springframework.data.domain.ExampleMatcher;
|
|
||||||
import org.springframework.test.context.junit4.SpringRunner;
|
|
||||||
import reactor.core.publisher.Flux;
|
|
||||||
import reactor.core.publisher.Mono;
|
|
||||||
import reactor.test.StepVerifier;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.springframework.data.domain.ExampleMatcher.GenericPropertyMatchers.startsWith;
|
|
||||||
|
|
||||||
@RunWith(SpringRunner.class)
|
|
||||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = Spring5ReactiveApplication.class)
|
|
||||||
public class AccountMongoRepositoryManualTest {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
AccountMongoRepository repository;
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void givenExample_whenFindAllWithExample_thenFindAllMacthings() {
|
|
||||||
repository.save(new Account(null, "john", 12.3)).block();
|
|
||||||
ExampleMatcher matcher = ExampleMatcher.matching().withMatcher("owner", startsWith());
|
|
||||||
Example<Account> example = Example.of(new Account(null, "jo", null), matcher);
|
|
||||||
Flux<Account> accountFlux = repository.findAll(example);
|
|
||||||
|
|
||||||
StepVerifier
|
|
||||||
.create(accountFlux)
|
|
||||||
.assertNext(account -> assertEquals("john", account.getOwner()))
|
|
||||||
.expectComplete()
|
|
||||||
.verify();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void givenAccount_whenSave_thenSave() {
|
|
||||||
Mono<Account> accountMono = repository.save(new Account(null, "john", 12.3));
|
|
||||||
|
|
||||||
StepVerifier
|
|
||||||
.create(accountMono)
|
|
||||||
.assertNext(account -> assertNotNull(account.getId()))
|
|
||||||
.expectComplete()
|
|
||||||
.verify();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void givenId_whenFindById_thenFindAccount() {
|
|
||||||
Account inserted = repository.save(new Account(null, "john", 12.3)).block();
|
|
||||||
Mono<Account> accountMono = repository.findById(inserted.getId());
|
|
||||||
|
|
||||||
StepVerifier
|
|
||||||
.create(accountMono)
|
|
||||||
.assertNext(account -> {
|
|
||||||
assertEquals("john", account.getOwner());
|
|
||||||
assertEquals(Double.valueOf(12.3), account.getValue());
|
|
||||||
assertNotNull(account.getId());
|
|
||||||
})
|
|
||||||
.expectComplete()
|
|
||||||
.verify();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,58 +0,0 @@
|
||||||
package com.baeldung.reactive.repository;
|
|
||||||
|
|
||||||
import com.baeldung.reactive.Spring5ReactiveApplication;
|
|
||||||
import com.baeldung.reactive.model.Account;
|
|
||||||
import io.reactivex.Observable;
|
|
||||||
import io.reactivex.Single;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
|
||||||
import org.springframework.test.context.junit4.SpringRunner;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
|
|
||||||
@RunWith(SpringRunner.class)
|
|
||||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = Spring5ReactiveApplication.class)
|
|
||||||
public class AccountRxJavaRepositoryManualTest {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
AccountRxJavaRepository repository;
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void givenValue_whenFindAllByValue_thenFindAccounts() throws InterruptedException {
|
|
||||||
repository.save(new Account(null, "bruno", 12.3)).blockingGet();
|
|
||||||
Observable<Account> accountObservable = repository.findAllByValue(12.3);
|
|
||||||
|
|
||||||
accountObservable
|
|
||||||
.test()
|
|
||||||
.await()
|
|
||||||
.assertComplete()
|
|
||||||
.assertValueAt(0, account -> {
|
|
||||||
assertEquals("bruno", account.getOwner());
|
|
||||||
assertEquals(Double.valueOf(12.3), account.getValue());
|
|
||||||
return true;
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void givenOwner_whenFindFirstByOwner_thenFindAccount() throws InterruptedException {
|
|
||||||
repository.save(new Account(null, "bruno", 12.3)).blockingGet();
|
|
||||||
Single<Account> accountSingle = repository.findFirstByOwner(Single.just("bruno"));
|
|
||||||
|
|
||||||
accountSingle
|
|
||||||
.test()
|
|
||||||
.await()
|
|
||||||
.assertComplete()
|
|
||||||
.assertValueAt(0, account -> {
|
|
||||||
assertEquals("bruno", account.getOwner());
|
|
||||||
assertEquals(Double.valueOf(12.3), account.getValue());
|
|
||||||
assertNotNull(account.getId());
|
|
||||||
return true;
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,48 +0,0 @@
|
||||||
package com.baeldung.reactive.template;
|
|
||||||
|
|
||||||
import com.baeldung.reactive.Spring5ReactiveApplication;
|
|
||||||
import com.baeldung.reactive.model.Account;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
|
||||||
import org.springframework.test.context.junit4.SpringRunner;
|
|
||||||
import reactor.core.publisher.Flux;
|
|
||||||
import reactor.core.publisher.Mono;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.*;
|
|
||||||
|
|
||||||
@RunWith(SpringRunner.class)
|
|
||||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = Spring5ReactiveApplication.class)
|
|
||||||
public class AccountTemplateOperationsManualTest {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
AccountTemplateOperations accountTemplate;
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void givenAccount_whenSave_thenSave() {
|
|
||||||
Account account = accountTemplate.save(Mono.just(new Account(null, "Raul", 12.3))).block();
|
|
||||||
assertNotNull( account.getId() );
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void givenId_whenFindById_thenFindAccount() {
|
|
||||||
Mono<Account> accountMono = accountTemplate.save(Mono.just(new Account(null, "Raul", 12.3)));
|
|
||||||
Mono<Account> accountMonoResult = accountTemplate.findById(accountMono.block().getId());
|
|
||||||
assertNotNull(accountMonoResult.block().getId());
|
|
||||||
assertEquals(accountMonoResult.block().getOwner(), "Raul");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void whenFindAll_thenFindAllAccounts() {
|
|
||||||
Account account1 = accountTemplate.save(Mono.just(new Account(null, "Raul", 12.3))).block();
|
|
||||||
Account account2 = accountTemplate.save(Mono.just(new Account(null, "Raul Torres", 13.3))).block();
|
|
||||||
Flux<Account> accountFlux = accountTemplate.findAll();
|
|
||||||
List<Account> accounts = accountFlux.collectList().block();
|
|
||||||
assertTrue(accounts.stream().anyMatch(x -> account1.getId().equals(x.getId()) ));
|
|
||||||
assertTrue(accounts.stream().anyMatch(x -> account2.getId().equals(x.getId()) ));
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,112 +0,0 @@
|
||||||
package com.baeldung.tailablecursor.service;
|
|
||||||
|
|
||||||
import com.baeldung.tailablecursor.domain.Log;
|
|
||||||
import com.baeldung.tailablecursor.domain.LogLevel;
|
|
||||||
import com.mongodb.MongoClient;
|
|
||||||
import com.mongodb.client.MongoCollection;
|
|
||||||
import com.mongodb.client.MongoDatabase;
|
|
||||||
import com.mongodb.client.model.CreateCollectionOptions;
|
|
||||||
import de.flapdoodle.embed.mongo.MongodExecutable;
|
|
||||||
import de.flapdoodle.embed.mongo.MongodProcess;
|
|
||||||
import de.flapdoodle.embed.mongo.MongodStarter;
|
|
||||||
import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
|
|
||||||
import de.flapdoodle.embed.mongo.config.Net;
|
|
||||||
import de.flapdoodle.embed.mongo.distribution.Version;
|
|
||||||
import de.flapdoodle.embed.process.runtime.Network;
|
|
||||||
import org.bson.Document;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.springframework.data.mongodb.core.MongoTemplate;
|
|
||||||
import org.springframework.util.SocketUtils;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.stream.IntStream;
|
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.is;
|
|
||||||
import static org.junit.Assert.assertThat;
|
|
||||||
|
|
||||||
public class ErrorLogsCounterManualTest {
|
|
||||||
|
|
||||||
private static final String SERVER = "localhost";
|
|
||||||
private static final int PORT = SocketUtils.findAvailableTcpPort(10000);
|
|
||||||
private static final String DB_NAME = "test";
|
|
||||||
private static final String COLLECTION_NAME = Log.class.getName().toLowerCase();
|
|
||||||
|
|
||||||
private static final MongodStarter starter = MongodStarter.getDefaultInstance();
|
|
||||||
private static final int MAX_DOCUMENTS_IN_COLLECTION = 3;
|
|
||||||
|
|
||||||
private ErrorLogsCounter errorLogsCounter;
|
|
||||||
private MongodExecutable mongodExecutable;
|
|
||||||
private MongodProcess mongoDaemon;
|
|
||||||
private MongoDatabase db;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setup() throws Exception {
|
|
||||||
MongoTemplate mongoTemplate = initMongoTemplate();
|
|
||||||
|
|
||||||
MongoCollection<Document> collection = createCappedCollection();
|
|
||||||
|
|
||||||
persistDocument(collection, -1, LogLevel.ERROR, "my-service", "Initial log");
|
|
||||||
|
|
||||||
errorLogsCounter = new ErrorLogsCounter(mongoTemplate, COLLECTION_NAME);
|
|
||||||
Thread.sleep(1000L); // wait for initialization
|
|
||||||
}
|
|
||||||
|
|
||||||
private MongoTemplate initMongoTemplate() throws IOException {
|
|
||||||
mongodExecutable = starter.prepare(new MongodConfigBuilder()
|
|
||||||
.version(Version.Main.PRODUCTION)
|
|
||||||
.net(new Net(SERVER, PORT, Network.localhostIsIPv6()))
|
|
||||||
.build());
|
|
||||||
mongoDaemon = mongodExecutable.start();
|
|
||||||
|
|
||||||
MongoClient mongoClient = new MongoClient(SERVER, PORT);
|
|
||||||
db = mongoClient.getDatabase(DB_NAME);
|
|
||||||
|
|
||||||
return new MongoTemplate(mongoClient, DB_NAME);
|
|
||||||
}
|
|
||||||
|
|
||||||
private MongoCollection<Document> createCappedCollection() {
|
|
||||||
db.createCollection(COLLECTION_NAME, new CreateCollectionOptions()
|
|
||||||
.capped(true)
|
|
||||||
.sizeInBytes(100000)
|
|
||||||
.maxDocuments(MAX_DOCUMENTS_IN_COLLECTION));
|
|
||||||
return db.getCollection(COLLECTION_NAME);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void persistDocument(MongoCollection<Document> collection,
|
|
||||||
int i, LogLevel level, String service, String message) {
|
|
||||||
Document logMessage = new Document();
|
|
||||||
logMessage.append("_id", i);
|
|
||||||
logMessage.append("level", level.toString());
|
|
||||||
logMessage.append("service", service);
|
|
||||||
logMessage.append("message", message);
|
|
||||||
collection.insertOne(logMessage);
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void tearDown() {
|
|
||||||
errorLogsCounter.close();
|
|
||||||
mongoDaemon.stop();
|
|
||||||
mongodExecutable.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void whenErrorLogsArePersisted_thenTheyAreReceivedByLogsCounter() throws Exception {
|
|
||||||
MongoCollection<Document> collection = db.getCollection(COLLECTION_NAME);
|
|
||||||
|
|
||||||
IntStream.range(1, 10)
|
|
||||||
.forEach(i -> persistDocument(collection,
|
|
||||||
i,
|
|
||||||
i > 5 ? LogLevel.ERROR : LogLevel.INFO,
|
|
||||||
"service" + i,
|
|
||||||
"Message from service " + i)
|
|
||||||
);
|
|
||||||
|
|
||||||
Thread.sleep(1000L); // wait to receive all messages from the reactive mongodb driver
|
|
||||||
|
|
||||||
assertThat(collection.countDocuments(), is((long) MAX_DOCUMENTS_IN_COLLECTION));
|
|
||||||
assertThat(errorLogsCounter.count(), is(5));
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,75 +0,0 @@
|
||||||
package com.baeldung.tailablecursor.service;
|
|
||||||
|
|
||||||
import com.baeldung.tailablecursor.LogsCounterApplication;
|
|
||||||
import com.baeldung.tailablecursor.domain.Log;
|
|
||||||
import com.baeldung.tailablecursor.domain.LogLevel;
|
|
||||||
import com.baeldung.tailablecursor.repository.LogsRepository;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
|
||||||
import org.springframework.data.mongodb.core.CollectionOptions;
|
|
||||||
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
|
|
||||||
import org.springframework.test.context.junit4.SpringRunner;
|
|
||||||
import reactor.core.publisher.Flux;
|
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.is;
|
|
||||||
import static org.junit.Assert.assertThat;
|
|
||||||
|
|
||||||
|
|
||||||
@RunWith(SpringRunner.class)
|
|
||||||
@SpringBootTest(classes = LogsCounterApplication.class)
|
|
||||||
@Slf4j
|
|
||||||
public class InfoLogsCounterManualTest {
|
|
||||||
@Autowired
|
|
||||||
private LogsRepository repository;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private ReactiveMongoTemplate template;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() {
|
|
||||||
createCappedCollectionUsingReactiveMongoTemplate(template);
|
|
||||||
|
|
||||||
persistDocument(Log.builder()
|
|
||||||
.level(LogLevel.INFO)
|
|
||||||
.service("Service 2")
|
|
||||||
.message("Initial INFO message")
|
|
||||||
.build());
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createCappedCollectionUsingReactiveMongoTemplate(ReactiveMongoTemplate reactiveMongoTemplate) {
|
|
||||||
reactiveMongoTemplate.dropCollection(Log.class).block();
|
|
||||||
reactiveMongoTemplate.createCollection(Log.class, CollectionOptions.empty()
|
|
||||||
.maxDocuments(5)
|
|
||||||
.size(1024 * 1024L)
|
|
||||||
.capped()).block();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void persistDocument(Log log) {
|
|
||||||
repository.save(log).block();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void wheInfoLogsArePersisted_thenTheyAreReceivedByLogsCounter() throws Exception {
|
|
||||||
InfoLogsCounter infoLogsCounter = new InfoLogsCounter(repository);
|
|
||||||
|
|
||||||
Thread.sleep(1000L); // wait for initialization
|
|
||||||
|
|
||||||
Flux.range(0,10)
|
|
||||||
.map(i -> Log.builder()
|
|
||||||
.level(i > 5 ? LogLevel.WARN : LogLevel.INFO)
|
|
||||||
.service("some-service")
|
|
||||||
.message("some log message")
|
|
||||||
.build())
|
|
||||||
.map(entity -> repository.save(entity).subscribe())
|
|
||||||
.blockLast();
|
|
||||||
|
|
||||||
Thread.sleep(1000L); // wait to receive all messages from the reactive mongodb driver
|
|
||||||
|
|
||||||
assertThat(infoLogsCounter.count(), is(7));
|
|
||||||
infoLogsCounter.close();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,75 +0,0 @@
|
||||||
package com.baeldung.tailablecursor.service;
|
|
||||||
|
|
||||||
import com.baeldung.tailablecursor.LogsCounterApplication;
|
|
||||||
import com.baeldung.tailablecursor.domain.Log;
|
|
||||||
import com.baeldung.tailablecursor.domain.LogLevel;
|
|
||||||
import com.baeldung.tailablecursor.repository.LogsRepository;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
|
||||||
import org.springframework.data.mongodb.core.CollectionOptions;
|
|
||||||
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
|
|
||||||
import org.springframework.test.context.junit4.SpringRunner;
|
|
||||||
import reactor.core.publisher.Flux;
|
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.is;
|
|
||||||
import static org.junit.Assert.assertThat;
|
|
||||||
|
|
||||||
|
|
||||||
@RunWith(SpringRunner.class)
|
|
||||||
@SpringBootTest(classes = LogsCounterApplication.class)
|
|
||||||
@Slf4j
|
|
||||||
public class WarnLogsCounterManualTest {
|
|
||||||
@Autowired
|
|
||||||
private LogsRepository repository;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private ReactiveMongoTemplate template;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() {
|
|
||||||
createCappedCollectionUsingReactiveMongoTemplate(template);
|
|
||||||
|
|
||||||
persistDocument(Log.builder()
|
|
||||||
.level(LogLevel.WARN)
|
|
||||||
.service("Service 1")
|
|
||||||
.message("Initial Warn message")
|
|
||||||
.build());
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createCappedCollectionUsingReactiveMongoTemplate(ReactiveMongoTemplate reactiveMongoTemplate) {
|
|
||||||
reactiveMongoTemplate.dropCollection(Log.class).block();
|
|
||||||
reactiveMongoTemplate.createCollection(Log.class, CollectionOptions.empty()
|
|
||||||
.maxDocuments(5)
|
|
||||||
.size(1024 * 1024L)
|
|
||||||
.capped()).block();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void persistDocument(Log log) {
|
|
||||||
repository.save(log).block();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void whenWarnLogsArePersisted_thenTheyAreReceivedByLogsCounter() throws Exception {
|
|
||||||
WarnLogsCounter warnLogsCounter = new WarnLogsCounter(template);
|
|
||||||
|
|
||||||
Thread.sleep(1000L); // wait for initialization
|
|
||||||
|
|
||||||
Flux.range(0,10)
|
|
||||||
.map(i -> Log.builder()
|
|
||||||
.level(i > 5 ? LogLevel.WARN : LogLevel.INFO)
|
|
||||||
.service("some-service")
|
|
||||||
.message("some log message")
|
|
||||||
.build())
|
|
||||||
.map(entity -> repository.save(entity).subscribe())
|
|
||||||
.blockLast();
|
|
||||||
|
|
||||||
Thread.sleep(1000L); // wait to receive all messages from the reactive mongodb driver
|
|
||||||
|
|
||||||
assertThat(warnLogsCounter.count(), is(5));
|
|
||||||
warnLogsCounter.close();
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue