diff --git a/libraries-data-db/pom.xml b/libraries-data-db/pom.xml
index d51580ccbc..01e078d18d 100644
--- a/libraries-data-db/pom.xml
+++ b/libraries-data-db/pom.xml
@@ -8,8 +8,9 @@
com.baeldung
- parent-modules
- 1.0.0-SNAPSHOT
+ parent-boot-2
+ 0.0.1-SNAPSHOT
+ ../parent-boot-2
@@ -76,6 +77,71 @@
io.ebean
ebean
${ebean.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+
+
+
+
+ io.debezium
+ debezium-api
+ ${debezium.version}
+
+
+ io.debezium
+ debezium-embedded
+ ${debezium.version}
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+
+ io.debezium
+ debezium-connector-mysql
+ ${debezium.version}
+
+
+
+ org.testcontainers
+ testcontainers
+ ${testcontainers-version}
+
+
+
+ org.testcontainers
+ mysql
+ ${testcontainers-version}
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-data-jpa
+
+
+
+
+ mysql
+ mysql-connector-java
+
+
+
+
+ org.projectlombok
+ lombok
@@ -213,6 +279,8 @@
3.2.0-m7
3.4.5
11.22.4
+ 1.4.2.Final
+ 1.15.3
-
\ No newline at end of file
+
diff --git a/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/DebeziumCDCApplication.java b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/DebeziumCDCApplication.java
new file mode 100644
index 0000000000..e690d096b1
--- /dev/null
+++ b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/DebeziumCDCApplication.java
@@ -0,0 +1,13 @@
+package com.baeldung.libraries.debezium;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class DebeziumCDCApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(DebeziumCDCApplication.class, args);
+ }
+
+}
diff --git a/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/config/DebeziumConnectorConfig.java b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/config/DebeziumConnectorConfig.java
new file mode 100644
index 0000000000..64e3bda4d6
--- /dev/null
+++ b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/config/DebeziumConnectorConfig.java
@@ -0,0 +1,58 @@
+package com.baeldung.libraries.debezium.config;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.io.File;
+import java.io.IOException;
+
+@Configuration
+public class DebeziumConnectorConfig {
+
+ /**
+ * Database details.
+ */
+ @Value("${customer.datasource.host}")
+ private String customerDbHost;
+
+ @Value("${customer.datasource.database}")
+ private String customerDbName;
+
+ @Value("${customer.datasource.port}")
+ private String customerDbPort;
+
+ @Value("${customer.datasource.username}")
+ private String customerDbUsername;
+
+ @Value("${customer.datasource.password}")
+ private String customerDbPassword;
+
+ /**
+ * Customer Database Connector Configuration
+ */
+ @Bean
+ public io.debezium.config.Configuration customerConnector() throws IOException {
+ File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
+ File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
+ return io.debezium.config.Configuration.create()
+ .with("name", "customer-mysql-connector")
+ .with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
+ .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
+ .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
+ .with("offset.flush.interval.ms", "60000")
+ .with("database.hostname", customerDbHost)
+ .with("database.port", customerDbPort)
+ .with("database.user", customerDbUsername)
+ .with("database.password", customerDbPassword)
+ .with("database.dbname", customerDbName)
+ .with("database.include.list", customerDbName)
+ .with("include.schema.changes", "false")
+ .with("database.allowPublicKeyRetrieval", "true")
+ .with("database.server.id", "10181")
+ .with("database.server.name", "customer-mysql-db-server")
+ .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
+ .with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
+ .build();
+ }
+}
diff --git a/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/entity/Customer.java b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/entity/Customer.java
new file mode 100644
index 0000000000..2100b135af
--- /dev/null
+++ b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/entity/Customer.java
@@ -0,0 +1,18 @@
+package com.baeldung.libraries.debezium.entity;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import javax.persistence.Entity;
+import javax.persistence.Id;
+
+@Entity
+@Getter
+@Setter
+public class Customer {
+ @Id
+ private Long id;
+
+ private String fullname;
+ private String email;
+}
diff --git a/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/listener/DebeziumListener.java b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/listener/DebeziumListener.java
new file mode 100644
index 0000000000..6826fe6d6d
--- /dev/null
+++ b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/listener/DebeziumListener.java
@@ -0,0 +1,83 @@
+package com.baeldung.libraries.debezium.listener;
+
+import com.baeldung.libraries.debezium.service.CustomerService;
+import io.debezium.config.Configuration;
+import io.debezium.embedded.Connect;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.RecordChangeEvent;
+import io.debezium.engine.format.ChangeEventFormat;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import static io.debezium.data.Envelope.FieldName.*;
+import static io.debezium.data.Envelope.Operation;
+import static java.util.stream.Collectors.toMap;
+
+@Slf4j
+@Component
+public class DebeziumListener {
+
+ private final Executor executor = Executors.newSingleThreadExecutor();
+ private final CustomerService customerService;
+ private final DebeziumEngine> debeziumEngine;
+
+ public DebeziumListener(Configuration customerConnectorConfiguration, CustomerService customerService) {
+
+ this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
+ .using(customerConnectorConfiguration.asProperties())
+ .notifying(this::handleChangeEvent)
+ .build();
+
+ this.customerService = customerService;
+ }
+
+ private void handleChangeEvent(RecordChangeEvent sourceRecordRecordChangeEvent) {
+ SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
+
+ log.info("Key = '" + sourceRecord.key() + "' value = '" + sourceRecord.value() + "'");
+
+ Struct sourceRecordChangeValue= (Struct) sourceRecord.value();
+
+ if (sourceRecordChangeValue != null) {
+ Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
+
+ if(operation != Operation.READ) {
+ String record = operation == Operation.DELETE ? BEFORE : AFTER; // Handling Update & Insert operations.
+
+ Struct struct = (Struct) sourceRecordChangeValue.get(record);
+ Map payload = struct.schema().fields().stream()
+ .map(Field::name)
+ .filter(fieldName -> struct.get(fieldName) != null)
+ .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
+ .collect(toMap(Pair::getKey, Pair::getValue));
+
+ this.customerService.replicateData(payload, operation);
+ log.info("Updated Data: {} with Operation: {}", payload, operation.name());
+ }
+ }
+ }
+
+ @PostConstruct
+ private void start() {
+ this.executor.execute(debeziumEngine);
+ }
+
+ @PreDestroy
+ private void stop() throws IOException {
+ if (this.debeziumEngine != null) {
+ this.debeziumEngine.close();
+ }
+ }
+
+}
diff --git a/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/repository/CustomerRepository.java b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/repository/CustomerRepository.java
new file mode 100644
index 0000000000..fc199cb85c
--- /dev/null
+++ b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/repository/CustomerRepository.java
@@ -0,0 +1,9 @@
+package com.baeldung.libraries.debezium.repository;
+
+import com.baeldung.libraries.debezium.entity.Customer;
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.stereotype.Repository;
+
+@Repository
+public interface CustomerRepository extends JpaRepository {
+}
diff --git a/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/service/CustomerService.java b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/service/CustomerService.java
new file mode 100644
index 0000000000..4b35a94eb5
--- /dev/null
+++ b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/service/CustomerService.java
@@ -0,0 +1,30 @@
+package com.baeldung.libraries.debezium.service;
+
+import com.baeldung.libraries.debezium.entity.Customer;
+import com.baeldung.libraries.debezium.repository.CustomerRepository;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.debezium.data.Envelope.Operation;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+
+@Service
+public class CustomerService {
+
+ private final CustomerRepository customerRepository;
+
+ public CustomerService(CustomerRepository customerRepository) {
+ this.customerRepository = customerRepository;
+ }
+
+ public void replicateData(Map customerData, Operation operation) {
+ final ObjectMapper mapper = new ObjectMapper();
+ final Customer customer = mapper.convertValue(customerData, Customer.class);
+
+ if (Operation.DELETE.name().equals(operation.name())) {
+ customerRepository.deleteById(customer.getId());
+ } else {
+ customerRepository.save(customer);
+ }
+ }
+}
diff --git a/libraries-data-db/src/main/resources/application.yml b/libraries-data-db/src/main/resources/application.yml
new file mode 100644
index 0000000000..3fcf4a12a9
--- /dev/null
+++ b/libraries-data-db/src/main/resources/application.yml
@@ -0,0 +1,34 @@
+## Server properties
+server:
+ port: 8080
+
+## Primary/Target Database Properties
+spring:
+ datasource:
+ url: jdbc:mysql://localhost:3306/customerdb
+ username: root
+ password: root
+ jpa.hibernate.ddl-auto: create-drop
+ jpa.show-sql: true
+
+## Source Database Properties
+customer:
+ datasource:
+ host: localhost
+ port: 3305
+ database: customerdb
+ username: root
+ password: root
+
+ ## Logging properties
+logging:
+ level:
+ root: INFO
+ io:
+ debezium:
+ mysql:
+ BinlogReader: INFO
+ com:
+ baeldung:
+ libraries:
+ debezium: DEBUG
diff --git a/libraries-data-db/src/main/resources/customer.sql b/libraries-data-db/src/main/resources/customer.sql
new file mode 100644
index 0000000000..25e3bcf569
--- /dev/null
+++ b/libraries-data-db/src/main/resources/customer.sql
@@ -0,0 +1,9 @@
+drop table if exists customer;
+
+CREATE TABLE customer
+(
+ id integer NOT NULL,
+ fullname character varying(255),
+ email character varying(255),
+ CONSTRAINT customer_pkey PRIMARY KEY (id)
+);
diff --git a/libraries-data-db/src/main/resources/docker-compose.yml b/libraries-data-db/src/main/resources/docker-compose.yml
new file mode 100644
index 0000000000..ddf865cc78
--- /dev/null
+++ b/libraries-data-db/src/main/resources/docker-compose.yml
@@ -0,0 +1,25 @@
+version: "3.9"
+services:
+ # Install Source MySQL DB and setup the Customer database
+ mysql-1:
+ container_name: source-database
+ image: mysql
+ command: --default-authentication-plugin=mysql_native_password
+ restart: always
+ ports:
+ - 3305:3306
+ environment:
+ MYSQL_ROOT_PASSWORD: root
+ MYSQL_DATABASE: customerdb
+
+ # Install Target MySQL DB and setup the Customer database
+ mysql-2:
+ container_name: target-database
+ image: mysql
+ command: --default-authentication-plugin=mysql_native_password
+ restart: always
+ ports:
+ - 3306:3306
+ environment:
+ MYSQL_ROOT_PASSWORD: root
+ MYSQL_DATABASE: customerdb
diff --git a/libraries-data-db/src/test/java/com/baeldung/libraries/debezium/DebeziumCDCLiveTest.java b/libraries-data-db/src/test/java/com/baeldung/libraries/debezium/DebeziumCDCLiveTest.java
new file mode 100644
index 0000000000..916ea4a120
--- /dev/null
+++ b/libraries-data-db/src/test/java/com/baeldung/libraries/debezium/DebeziumCDCLiveTest.java
@@ -0,0 +1,61 @@
+package com.baeldung.libraries.debezium;
+
+import com.baeldung.libraries.debezium.repository.CustomerRepository;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = DebeziumCDCApplication.class)
+@ActiveProfiles("test")
+public class DebeziumCDCLiveTest {
+
+ @Autowired
+ private CustomerRepository customerRepository;
+
+ @Autowired
+ @Qualifier("sourceJdbcTemplate")
+ private NamedParameterJdbcTemplate jdbcTemplate;
+
+ @Before
+ public void clearData() {
+ jdbcTemplate.update("delete from customer where id = :id", Collections.singletonMap("id", 1));
+ }
+
+ @DynamicPropertySource
+ static void registerProperties(DynamicPropertyRegistry registry) {
+ registry.add("customer.datasource.port", MySQLTestContainerConfiguration::getPort);
+ }
+
+ @Test
+ public void whenInsertDataToSourceDatabase_thenCdcOk() throws InterruptedException {
+ assertThat(customerRepository.findAll().size()).isZero();
+
+ // insert data to source DB
+ Map map = new HashMap<>();
+ map.put("id", 1);
+ map.put("fullname", "John Doe");
+ map.put("email", "test@test.com");
+
+ jdbcTemplate.update("INSERT INTO customer(id, fullname, email) VALUES (:id, :fullname, :email)", map);
+
+ // verify target DB
+ Thread.sleep(10000);
+ assertThat(customerRepository.findAll().size()).isNotZero();
+ }
+
+}
diff --git a/libraries-data-db/src/test/java/com/baeldung/libraries/debezium/MySQLTestContainerConfiguration.java b/libraries-data-db/src/test/java/com/baeldung/libraries/debezium/MySQLTestContainerConfiguration.java
new file mode 100644
index 0000000000..e7394e190b
--- /dev/null
+++ b/libraries-data-db/src/test/java/com/baeldung/libraries/debezium/MySQLTestContainerConfiguration.java
@@ -0,0 +1,61 @@
+package com.baeldung.libraries.debezium;
+
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.datasource.DriverManagerDataSource;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import javax.sql.DataSource;
+
+@Configuration
+public class MySQLTestContainerConfiguration {
+
+ public static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.0");
+
+ private static final MySQLContainer> mysqlContainer = new MySQLContainer<>(MYSQL_IMAGE)
+ .withCommand("--default-authentication-plugin=mysql_native_password")
+ .withInitScript("debezium/customer.sql")
+ .withDatabaseName("SOURCE_DB")
+ .withUsername("user")
+ .withPassword("user")
+ .withEnv("MYSQL_ROOT_PASSWORD", "user");
+
+ MySQLTestContainerConfiguration() {
+ mysqlContainer.start();
+ }
+
+ public static int getPort() {
+ return mysqlContainer.getFirstMappedPort();
+ }
+
+ @Bean
+ @Primary
+ public EmbeddedDatabase targetDatasource() {
+ return new EmbeddedDatabaseBuilder()
+ .setType(EmbeddedDatabaseType.H2)
+ .setName("TAGRET_DB")
+ .build();
+ }
+
+ @Bean(name = "SOURCE_DS")
+ public DataSource sourceDataSource() {
+ DriverManagerDataSource dataSource = new DriverManagerDataSource();
+ dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
+ dataSource.setUrl(mysqlContainer.getJdbcUrl());
+ dataSource.setUsername(mysqlContainer.getUsername());
+ dataSource.setPassword(mysqlContainer.getPassword());
+ return dataSource;
+ }
+
+ @Bean(name = "sourceJdbcTemplate")
+ public NamedParameterJdbcTemplate getJdbcTemplate(@Qualifier("SOURCE_DS") DataSource sourceDataSource) {
+ return new NamedParameterJdbcTemplate(sourceDataSource);
+ }
+}
diff --git a/libraries-data-db/src/test/resources/application-test.yml b/libraries-data-db/src/test/resources/application-test.yml
new file mode 100644
index 0000000000..96aaa2b727
--- /dev/null
+++ b/libraries-data-db/src/test/resources/application-test.yml
@@ -0,0 +1,7 @@
+## Source Database Properties
+customer:
+ datasource:
+ host: localhost
+ database: SOURCE_DB
+ username: root
+ password: user
diff --git a/libraries-data-db/src/test/resources/debezium/customer.sql b/libraries-data-db/src/test/resources/debezium/customer.sql
new file mode 100644
index 0000000000..25e3bcf569
--- /dev/null
+++ b/libraries-data-db/src/test/resources/debezium/customer.sql
@@ -0,0 +1,9 @@
+drop table if exists customer;
+
+CREATE TABLE customer
+(
+ id integer NOT NULL,
+ fullname character varying(255),
+ email character varying(255),
+ CONSTRAINT customer_pkey PRIMARY KEY (id)
+);