BAEL-4467: Introduction to Debezium

This commit is contained in:
Oussama BEN MAHMOUD 2021-04-04 18:08:07 +02:00
parent 96e97ae79b
commit 082f3133ba
14 changed files with 488 additions and 3 deletions

View File

@ -8,8 +8,9 @@
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
<artifactId>parent-boot-2</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../parent-boot-2</relativePath>
</parent>
<dependencies>
@ -76,6 +77,71 @@
<groupId>io.ebean</groupId>
<artifactId>ebean</artifactId>
<version>${ebean.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Debezium -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers-version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>${testcontainers-version}</version>
</dependency>
<!-- Spring Core dependencies-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Data JPA, crud repository -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- MySQL -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- Utility dependencies-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
@ -213,6 +279,8 @@
<javax.jdo.version>3.2.0-m7</javax.jdo.version>
<HikariCP.version>3.4.5</HikariCP.version>
<ebean.version>11.22.4</ebean.version>
<debezium.version>1.4.2.Final</debezium.version>
<testcontainers-version>1.15.3</testcontainers-version>
</properties>
</project>
</project>

View File

@ -0,0 +1,13 @@
package com.baeldung.libraries.debezium;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DebeziumCDCApplication {
public static void main(String[] args) {
SpringApplication.run(DebeziumCDCApplication.class, args);
}
}

View File

@ -0,0 +1,58 @@
package com.baeldung.libraries.debezium.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.File;
import java.io.IOException;
@Configuration
public class DebeziumConnectorConfig {
/**
* Database details.
*/
@Value("${customer.datasource.host}")
private String customerDbHost;
@Value("${customer.datasource.database}")
private String customerDbName;
@Value("${customer.datasource.port}")
private String customerDbPort;
@Value("${customer.datasource.username}")
private String customerDbUsername;
@Value("${customer.datasource.password}")
private String customerDbPassword;
/**
* Customer Database Connector Configuration
*/
@Bean
public io.debezium.config.Configuration customerConnector() throws IOException {
File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
return io.debezium.config.Configuration.create()
.with("name", "customer-mysql-connector")
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
.with("offset.flush.interval.ms", "60000")
.with("database.hostname", customerDbHost)
.with("database.port", customerDbPort)
.with("database.user", customerDbUsername)
.with("database.password", customerDbPassword)
.with("database.dbname", customerDbName)
.with("database.include.list", customerDbName)
.with("include.schema.changes", "false")
.with("database.allowPublicKeyRetrieval", "true")
.with("database.server.id", "10181")
.with("database.server.name", "customer-mysql-db-server")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
.build();
}
}

View File

@ -0,0 +1,18 @@
package com.baeldung.libraries.debezium.entity;
import lombok.Getter;
import lombok.Setter;
import javax.persistence.Entity;
import javax.persistence.Id;
@Entity
@Getter
@Setter
public class Customer {
@Id
private Long id;
private String fullname;
private String email;
}

View File

@ -0,0 +1,83 @@
package com.baeldung.libraries.debezium.listener;
import com.baeldung.libraries.debezium.service.CustomerService;
import io.debezium.config.Configuration;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import static io.debezium.data.Envelope.FieldName.*;
import static io.debezium.data.Envelope.Operation;
import static java.util.stream.Collectors.toMap;
@Slf4j
@Component
public class DebeziumListener {
private final Executor executor = Executors.newSingleThreadExecutor();
private final CustomerService customerService;
private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;
public DebeziumListener(Configuration customerConnectorConfiguration, CustomerService customerService) {
this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(customerConnectorConfiguration.asProperties())
.notifying(this::handleChangeEvent)
.build();
this.customerService = customerService;
}
private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
log.info("Key = '" + sourceRecord.key() + "' value = '" + sourceRecord.value() + "'");
Struct sourceRecordChangeValue= (Struct) sourceRecord.value();
if (sourceRecordChangeValue != null) {
Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
if(operation != Operation.READ) {
String record = operation == Operation.DELETE ? BEFORE : AFTER; // Handling Update & Insert operations.
Struct struct = (Struct) sourceRecordChangeValue.get(record);
Map<String, Object> payload = struct.schema().fields().stream()
.map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null)
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
this.customerService.replicateData(payload, operation);
log.info("Updated Data: {} with Operation: {}", payload, operation.name());
}
}
}
@PostConstruct
private void start() {
this.executor.execute(debeziumEngine);
}
@PreDestroy
private void stop() throws IOException {
if (this.debeziumEngine != null) {
this.debeziumEngine.close();
}
}
}

View File

@ -0,0 +1,9 @@
package com.baeldung.libraries.debezium.repository;
import com.baeldung.libraries.debezium.entity.Customer;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface CustomerRepository extends JpaRepository<Customer, Long> {
}

View File

@ -0,0 +1,30 @@
package com.baeldung.libraries.debezium.service;
import com.baeldung.libraries.debezium.entity.Customer;
import com.baeldung.libraries.debezium.repository.CustomerRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.data.Envelope.Operation;
import org.springframework.stereotype.Service;
import java.util.Map;
@Service
public class CustomerService {
private final CustomerRepository customerRepository;
public CustomerService(CustomerRepository customerRepository) {
this.customerRepository = customerRepository;
}
public void replicateData(Map<String, Object> customerData, Operation operation) {
final ObjectMapper mapper = new ObjectMapper();
final Customer customer = mapper.convertValue(customerData, Customer.class);
if (Operation.DELETE.name().equals(operation.name())) {
customerRepository.deleteById(customer.getId());
} else {
customerRepository.save(customer);
}
}
}

View File

@ -0,0 +1,34 @@
## Server properties
server:
port: 8080
## Primary/Target Database Properties
spring:
datasource:
url: jdbc:mysql://localhost:3306/customerdb
username: root
password: root
jpa.hibernate.ddl-auto: create-drop
jpa.show-sql: true
## Source Database Properties
customer:
datasource:
host: localhost
port: 3305
database: customerdb
username: root
password: root
## Logging properties
logging:
level:
root: INFO
io:
debezium:
mysql:
BinlogReader: INFO
com:
baeldung:
libraries:
debezium: DEBUG

View File

@ -0,0 +1,9 @@
drop table if exists customer;
CREATE TABLE customer
(
id integer NOT NULL,
fullname character varying(255),
email character varying(255),
CONSTRAINT customer_pkey PRIMARY KEY (id)
);

View File

@ -0,0 +1,25 @@
version: "3.9"
services:
# Install Source MySQL DB and setup the Customer database
mysql-1:
container_name: source-database
image: mysql
command: --default-authentication-plugin=mysql_native_password
restart: always
ports:
- 3305:3306
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: customerdb
# Install Target MySQL DB and setup the Customer database
mysql-2:
container_name: target-database
image: mysql
command: --default-authentication-plugin=mysql_native_password
restart: always
ports:
- 3306:3306
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: customerdb

View File

@ -0,0 +1,61 @@
package com.baeldung.libraries.debezium;
import com.baeldung.libraries.debezium.repository.CustomerRepository;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DebeziumCDCApplication.class)
@ActiveProfiles("test")
public class DebeziumCDCLiveTest {
@Autowired
private CustomerRepository customerRepository;
@Autowired
@Qualifier("sourceJdbcTemplate")
private NamedParameterJdbcTemplate jdbcTemplate;
@Before
public void clearData() {
jdbcTemplate.update("delete from customer where id = :id", Collections.singletonMap("id", 1));
}
@DynamicPropertySource
static void registerProperties(DynamicPropertyRegistry registry) {
registry.add("customer.datasource.port", MySQLTestContainerConfiguration::getPort);
}
@Test
public void whenInsertDataToSourceDatabase_thenCdcOk() throws InterruptedException {
assertThat(customerRepository.findAll().size()).isZero();
// insert data to source DB
Map<String, Object> map = new HashMap<>();
map.put("id", 1);
map.put("fullname", "John Doe");
map.put("email", "test@test.com");
jdbcTemplate.update("INSERT INTO customer(id, fullname, email) VALUES (:id, :fullname, :email)", map);
// verify target DB
Thread.sleep(10000);
assertThat(customerRepository.findAll().size()).isNotZero();
}
}

View File

@ -0,0 +1,61 @@
package com.baeldung.libraries.debezium;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.utility.DockerImageName;
import javax.sql.DataSource;
@Configuration
public class MySQLTestContainerConfiguration {
public static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.0");
private static final MySQLContainer<?> mysqlContainer = new MySQLContainer<>(MYSQL_IMAGE)
.withCommand("--default-authentication-plugin=mysql_native_password")
.withInitScript("debezium/customer.sql")
.withDatabaseName("SOURCE_DB")
.withUsername("user")
.withPassword("user")
.withEnv("MYSQL_ROOT_PASSWORD", "user");
MySQLTestContainerConfiguration() {
mysqlContainer.start();
}
public static int getPort() {
return mysqlContainer.getFirstMappedPort();
}
@Bean
@Primary
public EmbeddedDatabase targetDatasource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.setName("TAGRET_DB")
.build();
}
@Bean(name = "SOURCE_DS")
public DataSource sourceDataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setUrl(mysqlContainer.getJdbcUrl());
dataSource.setUsername(mysqlContainer.getUsername());
dataSource.setPassword(mysqlContainer.getPassword());
return dataSource;
}
@Bean(name = "sourceJdbcTemplate")
public NamedParameterJdbcTemplate getJdbcTemplate(@Qualifier("SOURCE_DS") DataSource sourceDataSource) {
return new NamedParameterJdbcTemplate(sourceDataSource);
}
}

View File

@ -0,0 +1,7 @@
## Source Database Properties
customer:
datasource:
host: localhost
database: SOURCE_DB
username: root
password: user

View File

@ -0,0 +1,9 @@
drop table if exists customer;
CREATE TABLE customer
(
id integer NOT NULL,
fullname character varying(255),
email character varying(255),
CONSTRAINT customer_pkey PRIMARY KEY (id)
);