Spring Data Cassandra Reactive

Issue: BAEL-1870
This commit is contained in:
Aravind Ranganathan 2018-11-28 12:01:32 -05:00 committed by Josh Cummings
parent ac37b4fb65
commit b6431622ed
8 changed files with 254 additions and 0 deletions

View File

@ -0,0 +1,70 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.baeldung</groupId>
<artifactId>spring-data-cassandra-reactive</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-data-cassandra-reactive</name>
<description>Spring Data Cassandra reactive</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8
</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-cassandra</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,12 @@
package com.baeldung.cassandra.reactive;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringDataCassandraReactiveApplication {
public static void main(String[] args) {
SpringApplication.run(SpringDataCassandraReactiveApplication.class, args);
}
}

View File

@ -0,0 +1,49 @@
package com.baeldung.cassandra.reactive.controller;
import com.baeldung.cassandra.reactive.model.Employee;
import com.baeldung.cassandra.reactive.service.EmployeeService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
@RestController
@RequestMapping("employee")
public class EmployeeController {
@Autowired
EmployeeService employeeService;
@PostConstruct
public void saveEmployees() {
List<Employee> employees = new ArrayList<>();
employees.add(new Employee(123, "John Doe", "Delaware", "jdoe@xyz.com", 31));
employees.add(new Employee(324, "Adam Smith", "North Carolina", "asmith@xyz.com", 43));
employees.add(new Employee(355, "Kevin Dunner", "Virginia", "kdunner@xyz.com", 24));
employees.add(new Employee(643, "Mike Lauren", "New York", "mlauren@xyz.com", 41));
employeeService.initializeEmployees(employees);
}
@GetMapping("/list")
public Flux<Employee> getAllEmployees() {
Flux<Employee> employees = employeeService.getAllEmployees();
return employees;
}
@GetMapping("/{id}")
public Mono<Employee> getEmployeeById(@PathVariable int id) {
return employeeService.getEmployeeById(id);
}
@GetMapping("/filterByAge/{age}")
public Flux<Employee> getEmployeesFilterByAge(@PathVariable int age) {
return employeeService.getEmployeesFilterByAge(age);
}
}

View File

@ -0,0 +1,20 @@
package com.baeldung.cassandra.reactive.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.cassandra.core.mapping.PrimaryKey;
import org.springframework.data.cassandra.core.mapping.Table;
@Table
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Employee {
@PrimaryKey
private int id;
private String name;
private String address;
private String email;
private int age;
}

View File

@ -0,0 +1,11 @@
package com.baeldung.cassandra.reactive.repository;
import com.baeldung.cassandra.reactive.model.Employee;
import org.springframework.data.cassandra.repository.AllowFiltering;
import org.springframework.data.cassandra.repository.ReactiveCassandraRepository;
import reactor.core.publisher.Flux;
public interface EmployeeRepository extends ReactiveCassandraRepository<Employee, Integer> {
@AllowFiltering
Flux<Employee> findByAgeGreaterThan(int age);
}

View File

@ -0,0 +1,35 @@
package com.baeldung.cassandra.reactive.service;
import com.baeldung.cassandra.reactive.model.Employee;
import com.baeldung.cassandra.reactive.repository.EmployeeRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
@Service
public class EmployeeService {
@Autowired
EmployeeRepository employeeRepository;
public void initializeEmployees(List<Employee> employees) {
Flux<Employee> savedEmployees = employeeRepository.saveAll(employees);
savedEmployees.subscribe();
}
public Flux<Employee> getAllEmployees() {
Flux<Employee> employees = employeeRepository.findAll();
return employees;
}
public Flux<Employee> getEmployeesFilterByAge(int age) {
return employeeRepository.findByAgeGreaterThan(age);
}
public Mono<Employee> getEmployeeById(int id) {
return employeeRepository.findById(id);
}
}

View File

@ -0,0 +1,2 @@
spring.data.cassandra.keyspace-name=practice
spring.data.cassandra.port=9042

View File

@ -0,0 +1,55 @@
package com.baeldung.cassandra.reactive;
import com.baeldung.cassandra.reactive.model.Employee;
import com.baeldung.cassandra.reactive.repository.EmployeeRepository;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ReactiveEmployeeRepositoryIntegrationTest {
@Autowired
EmployeeRepository repository;
@Before
public void setUp() {
Flux<Employee> deleteAndInsert = repository.deleteAll() //
.thenMany(repository.saveAll(Flux.just(
new Employee(111, "John Doe", "Delaware", "jdoe@xyz.com", 31),
new Employee(222, "Adam Smith", "North Carolina", "asmith@xyz.com", 43),
new Employee(333, "Kevin Dunner", "Virginia", "kdunner@xyz.com", 24),
new Employee(444, "Mike Lauren", "New York", "mlauren@xyz.com", 41))));
StepVerifier.create(deleteAndInsert).expectNextCount(4).verifyComplete();
}
@Test
public void givenRecordsAreInserted_whenDbIsQueried_thenShouldIncludeNewRecords() {
Mono<Long> saveAndCount = repository.count()
.doOnNext(System.out::println)
.thenMany(repository.saveAll(Flux.just(new Employee(325, "Kim Jones", "Florida", "kjones@xyz.com", 42),
new Employee(654, "Tom Moody", "New Hampshire", "tmoody@xyz.com", 44))))
.last()
.flatMap(v -> repository.count())
.doOnNext(System.out::println);
StepVerifier.create(saveAndCount).expectNext(6L).verifyComplete();
}
@Test
public void givenAgeForFilter_whenDbIsQueried_thenShouldReturnFilteredRecords() {
StepVerifier.create(repository.findByAgeGreaterThan(35)).expectNextCount(2).verifyComplete();
}
}