From b6431622ed022ca029a725a902bb0458ccbd5720 Mon Sep 17 00:00:00 2001 From: Aravind Ranganathan Date: Wed, 28 Nov 2018 12:01:32 -0500 Subject: [PATCH] Spring Data Cassandra Reactive Issue: BAEL-1870 --- .../spring-data-cassandra-reactive/pom.xml | 70 +++++++++++++++++++ ...pringDataCassandraReactiveApplication.java | 12 ++++ .../controller/EmployeeController.java | 49 +++++++++++++ .../cassandra/reactive/model/Employee.java | 20 ++++++ .../repository/EmployeeRepository.java | 11 +++ .../reactive/service/EmployeeService.java | 35 ++++++++++ .../src/main/resources/application.properties | 2 + ...tiveEmployeeRepositoryIntegrationTest.java | 55 +++++++++++++++ 8 files changed, 254 insertions(+) create mode 100644 persistence-modules/spring-data-cassandra-reactive/pom.xml create mode 100644 persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/SpringDataCassandraReactiveApplication.java create mode 100644 persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/controller/EmployeeController.java create mode 100644 persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/model/Employee.java create mode 100644 persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/repository/EmployeeRepository.java create mode 100644 persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/service/EmployeeService.java create mode 100644 persistence-modules/spring-data-cassandra-reactive/src/main/resources/application.properties create mode 100644 persistence-modules/spring-data-cassandra-reactive/src/test/java/com/baeldung/cassandra/reactive/ReactiveEmployeeRepositoryIntegrationTest.java diff --git a/persistence-modules/spring-data-cassandra-reactive/pom.xml b/persistence-modules/spring-data-cassandra-reactive/pom.xml new file mode 100644 index 0000000000..037b1fd3c1 --- /dev/null +++ b/persistence-modules/spring-data-cassandra-reactive/pom.xml @@ -0,0 +1,70 @@ + + + 4.0.0 + + com.baeldung + spring-data-cassandra-reactive + 0.0.1-SNAPSHOT + jar + + spring-data-cassandra-reactive + Spring Data Cassandra reactive + + + org.springframework.boot + spring-boot-starter-parent + 2.1.0.RELEASE + + + + + UTF-8 + UTF-8 + + 1.8 + + + + + org.springframework.data + spring-data-cassandra + 2.1.2.RELEASE + + + io.projectreactor + reactor-core + + + org.springframework.boot + spring-boot-starter-web + + + org.projectlombok + lombok + true + + + org.springframework.boot + spring-boot-starter-test + test + + + io.projectreactor + reactor-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + diff --git a/persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/SpringDataCassandraReactiveApplication.java b/persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/SpringDataCassandraReactiveApplication.java new file mode 100644 index 0000000000..5f467042a3 --- /dev/null +++ b/persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/SpringDataCassandraReactiveApplication.java @@ -0,0 +1,12 @@ +package com.baeldung.cassandra.reactive; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class SpringDataCassandraReactiveApplication { + + public static void main(String[] args) { + SpringApplication.run(SpringDataCassandraReactiveApplication.class, args); + } +} diff --git a/persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/controller/EmployeeController.java b/persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/controller/EmployeeController.java new file mode 100644 index 0000000000..e9de213e61 --- /dev/null +++ b/persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/controller/EmployeeController.java @@ -0,0 +1,49 @@ +package com.baeldung.cassandra.reactive.controller; + +import com.baeldung.cassandra.reactive.model.Employee; +import com.baeldung.cassandra.reactive.service.EmployeeService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import javax.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.List; + +@RestController +@RequestMapping("employee") +public class EmployeeController { + + @Autowired + EmployeeService employeeService; + + @PostConstruct + public void saveEmployees() { + List employees = new ArrayList<>(); + employees.add(new Employee(123, "John Doe", "Delaware", "jdoe@xyz.com", 31)); + employees.add(new Employee(324, "Adam Smith", "North Carolina", "asmith@xyz.com", 43)); + employees.add(new Employee(355, "Kevin Dunner", "Virginia", "kdunner@xyz.com", 24)); + employees.add(new Employee(643, "Mike Lauren", "New York", "mlauren@xyz.com", 41)); + employeeService.initializeEmployees(employees); + } + + @GetMapping("/list") + public Flux getAllEmployees() { + Flux employees = employeeService.getAllEmployees(); + return employees; + } + + @GetMapping("/{id}") + public Mono getEmployeeById(@PathVariable int id) { + return employeeService.getEmployeeById(id); + } + + @GetMapping("/filterByAge/{age}") + public Flux getEmployeesFilterByAge(@PathVariable int age) { + return employeeService.getEmployeesFilterByAge(age); + } +} diff --git a/persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/model/Employee.java b/persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/model/Employee.java new file mode 100644 index 0000000000..a78f884778 --- /dev/null +++ b/persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/model/Employee.java @@ -0,0 +1,20 @@ +package com.baeldung.cassandra.reactive.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.data.cassandra.core.mapping.PrimaryKey; +import org.springframework.data.cassandra.core.mapping.Table; + +@Table +@Data +@AllArgsConstructor +@NoArgsConstructor +public class Employee { + @PrimaryKey + private int id; + private String name; + private String address; + private String email; + private int age; +} diff --git a/persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/repository/EmployeeRepository.java b/persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/repository/EmployeeRepository.java new file mode 100644 index 0000000000..22dc2b4565 --- /dev/null +++ b/persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/repository/EmployeeRepository.java @@ -0,0 +1,11 @@ +package com.baeldung.cassandra.reactive.repository; + +import com.baeldung.cassandra.reactive.model.Employee; +import org.springframework.data.cassandra.repository.AllowFiltering; +import org.springframework.data.cassandra.repository.ReactiveCassandraRepository; +import reactor.core.publisher.Flux; + +public interface EmployeeRepository extends ReactiveCassandraRepository { + @AllowFiltering + Flux findByAgeGreaterThan(int age); +} diff --git a/persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/service/EmployeeService.java b/persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/service/EmployeeService.java new file mode 100644 index 0000000000..40c330937a --- /dev/null +++ b/persistence-modules/spring-data-cassandra-reactive/src/main/java/com/baeldung/cassandra/reactive/service/EmployeeService.java @@ -0,0 +1,35 @@ +package com.baeldung.cassandra.reactive.service; + +import com.baeldung.cassandra.reactive.model.Employee; +import com.baeldung.cassandra.reactive.repository.EmployeeRepository; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.List; + +@Service +public class EmployeeService { + + @Autowired + EmployeeRepository employeeRepository; + + public void initializeEmployees(List employees) { + Flux savedEmployees = employeeRepository.saveAll(employees); + savedEmployees.subscribe(); + } + + public Flux getAllEmployees() { + Flux employees = employeeRepository.findAll(); + return employees; + } + + public Flux getEmployeesFilterByAge(int age) { + return employeeRepository.findByAgeGreaterThan(age); + } + + public Mono getEmployeeById(int id) { + return employeeRepository.findById(id); + } +} diff --git a/persistence-modules/spring-data-cassandra-reactive/src/main/resources/application.properties b/persistence-modules/spring-data-cassandra-reactive/src/main/resources/application.properties new file mode 100644 index 0000000000..7ed2f10131 --- /dev/null +++ b/persistence-modules/spring-data-cassandra-reactive/src/main/resources/application.properties @@ -0,0 +1,2 @@ +spring.data.cassandra.keyspace-name=practice +spring.data.cassandra.port=9042 \ No newline at end of file diff --git a/persistence-modules/spring-data-cassandra-reactive/src/test/java/com/baeldung/cassandra/reactive/ReactiveEmployeeRepositoryIntegrationTest.java b/persistence-modules/spring-data-cassandra-reactive/src/test/java/com/baeldung/cassandra/reactive/ReactiveEmployeeRepositoryIntegrationTest.java new file mode 100644 index 0000000000..ad726fe969 --- /dev/null +++ b/persistence-modules/spring-data-cassandra-reactive/src/test/java/com/baeldung/cassandra/reactive/ReactiveEmployeeRepositoryIntegrationTest.java @@ -0,0 +1,55 @@ +package com.baeldung.cassandra.reactive; + +import com.baeldung.cassandra.reactive.model.Employee; +import com.baeldung.cassandra.reactive.repository.EmployeeRepository; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + + +@RunWith(SpringRunner.class) +@SpringBootTest +public class ReactiveEmployeeRepositoryIntegrationTest { + + @Autowired + EmployeeRepository repository; + + @Before + public void setUp() { + + Flux deleteAndInsert = repository.deleteAll() // + .thenMany(repository.saveAll(Flux.just( + new Employee(111, "John Doe", "Delaware", "jdoe@xyz.com", 31), + new Employee(222, "Adam Smith", "North Carolina", "asmith@xyz.com", 43), + new Employee(333, "Kevin Dunner", "Virginia", "kdunner@xyz.com", 24), + new Employee(444, "Mike Lauren", "New York", "mlauren@xyz.com", 41)))); + + StepVerifier.create(deleteAndInsert).expectNextCount(4).verifyComplete(); + } + + @Test + public void givenRecordsAreInserted_whenDbIsQueried_thenShouldIncludeNewRecords() { + + Mono saveAndCount = repository.count() + .doOnNext(System.out::println) + .thenMany(repository.saveAll(Flux.just(new Employee(325, "Kim Jones", "Florida", "kjones@xyz.com", 42), + new Employee(654, "Tom Moody", "New Hampshire", "tmoody@xyz.com", 44)))) + .last() + .flatMap(v -> repository.count()) + .doOnNext(System.out::println); + + StepVerifier.create(saveAndCount).expectNext(6L).verifyComplete(); + } + + @Test + public void givenAgeForFilter_whenDbIsQueried_thenShouldReturnFilteredRecords() { + StepVerifier.create(repository.findByAgeGreaterThan(35)).expectNextCount(2).verifyComplete(); + } + +}