From 082f3133ba9628d6ffbf4387c0df060b1939b440 Mon Sep 17 00:00:00 2001 From: Oussama BEN MAHMOUD Date: Sun, 4 Apr 2021 18:08:07 +0200 Subject: [PATCH] BAEL-4467: Introduction to Debezium --- libraries-data-db/pom.xml | 74 ++++++++++++++++- .../debezium/DebeziumCDCApplication.java | 13 +++ .../config/DebeziumConnectorConfig.java | 58 +++++++++++++ .../libraries/debezium/entity/Customer.java | 18 ++++ .../debezium/listener/DebeziumListener.java | 83 +++++++++++++++++++ .../repository/CustomerRepository.java | 9 ++ .../debezium/service/CustomerService.java | 30 +++++++ .../src/main/resources/application.yml | 34 ++++++++ .../src/main/resources/customer.sql | 9 ++ .../src/main/resources/docker-compose.yml | 25 ++++++ .../debezium/DebeziumCDCLiveTest.java | 61 ++++++++++++++ .../MySQLTestContainerConfiguration.java | 61 ++++++++++++++ .../src/test/resources/application-test.yml | 7 ++ .../src/test/resources/debezium/customer.sql | 9 ++ 14 files changed, 488 insertions(+), 3 deletions(-) create mode 100644 libraries-data-db/src/main/java/com/baeldung/libraries/debezium/DebeziumCDCApplication.java create mode 100644 libraries-data-db/src/main/java/com/baeldung/libraries/debezium/config/DebeziumConnectorConfig.java create mode 100644 libraries-data-db/src/main/java/com/baeldung/libraries/debezium/entity/Customer.java create mode 100644 libraries-data-db/src/main/java/com/baeldung/libraries/debezium/listener/DebeziumListener.java create mode 100644 libraries-data-db/src/main/java/com/baeldung/libraries/debezium/repository/CustomerRepository.java create mode 100644 libraries-data-db/src/main/java/com/baeldung/libraries/debezium/service/CustomerService.java create mode 100644 libraries-data-db/src/main/resources/application.yml create mode 100644 libraries-data-db/src/main/resources/customer.sql create mode 100644 libraries-data-db/src/main/resources/docker-compose.yml create mode 100644 libraries-data-db/src/test/java/com/baeldung/libraries/debezium/DebeziumCDCLiveTest.java create mode 100644 libraries-data-db/src/test/java/com/baeldung/libraries/debezium/MySQLTestContainerConfiguration.java create mode 100644 libraries-data-db/src/test/resources/application-test.yml create mode 100644 libraries-data-db/src/test/resources/debezium/customer.sql diff --git a/libraries-data-db/pom.xml b/libraries-data-db/pom.xml index d51580ccbc..01e078d18d 100644 --- a/libraries-data-db/pom.xml +++ b/libraries-data-db/pom.xml @@ -8,8 +8,9 @@ com.baeldung - parent-modules - 1.0.0-SNAPSHOT + parent-boot-2 + 0.0.1-SNAPSHOT + ../parent-boot-2 @@ -76,6 +77,71 @@ io.ebean ebean ${ebean.version} + + + com.fasterxml.jackson.core + jackson-core + + + + + + + io.debezium + debezium-api + ${debezium.version} + + + io.debezium + debezium-embedded + ${debezium.version} + + + org.slf4j + slf4j-log4j12 + + + + + io.debezium + debezium-connector-mysql + ${debezium.version} + + + + org.testcontainers + testcontainers + ${testcontainers-version} + + + + org.testcontainers + mysql + ${testcontainers-version} + + + + + org.springframework.boot + spring-boot-starter-web + + + + + org.springframework.boot + spring-boot-starter-data-jpa + + + + + mysql + mysql-connector-java + + + + + org.projectlombok + lombok @@ -213,6 +279,8 @@ 3.2.0-m7 3.4.5 11.22.4 + 1.4.2.Final + 1.15.3 - \ No newline at end of file + diff --git a/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/DebeziumCDCApplication.java b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/DebeziumCDCApplication.java new file mode 100644 index 0000000000..e690d096b1 --- /dev/null +++ b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/DebeziumCDCApplication.java @@ -0,0 +1,13 @@ +package com.baeldung.libraries.debezium; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class DebeziumCDCApplication { + + public static void main(String[] args) { + SpringApplication.run(DebeziumCDCApplication.class, args); + } + +} diff --git a/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/config/DebeziumConnectorConfig.java b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/config/DebeziumConnectorConfig.java new file mode 100644 index 0000000000..64e3bda4d6 --- /dev/null +++ b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/config/DebeziumConnectorConfig.java @@ -0,0 +1,58 @@ +package com.baeldung.libraries.debezium.config; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.io.File; +import java.io.IOException; + +@Configuration +public class DebeziumConnectorConfig { + + /** + * Database details. + */ + @Value("${customer.datasource.host}") + private String customerDbHost; + + @Value("${customer.datasource.database}") + private String customerDbName; + + @Value("${customer.datasource.port}") + private String customerDbPort; + + @Value("${customer.datasource.username}") + private String customerDbUsername; + + @Value("${customer.datasource.password}") + private String customerDbPassword; + + /** + * Customer Database Connector Configuration + */ + @Bean + public io.debezium.config.Configuration customerConnector() throws IOException { + File offsetStorageTempFile = File.createTempFile("offsets_", ".dat"); + File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat"); + return io.debezium.config.Configuration.create() + .with("name", "customer-mysql-connector") + .with("connector.class", "io.debezium.connector.mysql.MySqlConnector") + .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore") + .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath()) + .with("offset.flush.interval.ms", "60000") + .with("database.hostname", customerDbHost) + .with("database.port", customerDbPort) + .with("database.user", customerDbUsername) + .with("database.password", customerDbPassword) + .with("database.dbname", customerDbName) + .with("database.include.list", customerDbName) + .with("include.schema.changes", "false") + .with("database.allowPublicKeyRetrieval", "true") + .with("database.server.id", "10181") + .with("database.server.name", "customer-mysql-db-server") + .with("database.history", "io.debezium.relational.history.FileDatabaseHistory") + .with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath()) + .build(); + } +} diff --git a/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/entity/Customer.java b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/entity/Customer.java new file mode 100644 index 0000000000..2100b135af --- /dev/null +++ b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/entity/Customer.java @@ -0,0 +1,18 @@ +package com.baeldung.libraries.debezium.entity; + +import lombok.Getter; +import lombok.Setter; + +import javax.persistence.Entity; +import javax.persistence.Id; + +@Entity +@Getter +@Setter +public class Customer { + @Id + private Long id; + + private String fullname; + private String email; +} diff --git a/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/listener/DebeziumListener.java b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/listener/DebeziumListener.java new file mode 100644 index 0000000000..6826fe6d6d --- /dev/null +++ b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/listener/DebeziumListener.java @@ -0,0 +1,83 @@ +package com.baeldung.libraries.debezium.listener; + +import com.baeldung.libraries.debezium.service.CustomerService; +import io.debezium.config.Configuration; +import io.debezium.embedded.Connect; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.RecordChangeEvent; +import io.debezium.engine.format.ChangeEventFormat; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import static io.debezium.data.Envelope.FieldName.*; +import static io.debezium.data.Envelope.Operation; +import static java.util.stream.Collectors.toMap; + +@Slf4j +@Component +public class DebeziumListener { + + private final Executor executor = Executors.newSingleThreadExecutor(); + private final CustomerService customerService; + private final DebeziumEngine> debeziumEngine; + + public DebeziumListener(Configuration customerConnectorConfiguration, CustomerService customerService) { + + this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)) + .using(customerConnectorConfiguration.asProperties()) + .notifying(this::handleChangeEvent) + .build(); + + this.customerService = customerService; + } + + private void handleChangeEvent(RecordChangeEvent sourceRecordRecordChangeEvent) { + SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record(); + + log.info("Key = '" + sourceRecord.key() + "' value = '" + sourceRecord.value() + "'"); + + Struct sourceRecordChangeValue= (Struct) sourceRecord.value(); + + if (sourceRecordChangeValue != null) { + Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION)); + + if(operation != Operation.READ) { + String record = operation == Operation.DELETE ? BEFORE : AFTER; // Handling Update & Insert operations. + + Struct struct = (Struct) sourceRecordChangeValue.get(record); + Map payload = struct.schema().fields().stream() + .map(Field::name) + .filter(fieldName -> struct.get(fieldName) != null) + .map(fieldName -> Pair.of(fieldName, struct.get(fieldName))) + .collect(toMap(Pair::getKey, Pair::getValue)); + + this.customerService.replicateData(payload, operation); + log.info("Updated Data: {} with Operation: {}", payload, operation.name()); + } + } + } + + @PostConstruct + private void start() { + this.executor.execute(debeziumEngine); + } + + @PreDestroy + private void stop() throws IOException { + if (this.debeziumEngine != null) { + this.debeziumEngine.close(); + } + } + +} diff --git a/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/repository/CustomerRepository.java b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/repository/CustomerRepository.java new file mode 100644 index 0000000000..fc199cb85c --- /dev/null +++ b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/repository/CustomerRepository.java @@ -0,0 +1,9 @@ +package com.baeldung.libraries.debezium.repository; + +import com.baeldung.libraries.debezium.entity.Customer; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface CustomerRepository extends JpaRepository { +} diff --git a/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/service/CustomerService.java b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/service/CustomerService.java new file mode 100644 index 0000000000..4b35a94eb5 --- /dev/null +++ b/libraries-data-db/src/main/java/com/baeldung/libraries/debezium/service/CustomerService.java @@ -0,0 +1,30 @@ +package com.baeldung.libraries.debezium.service; + +import com.baeldung.libraries.debezium.entity.Customer; +import com.baeldung.libraries.debezium.repository.CustomerRepository; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.debezium.data.Envelope.Operation; +import org.springframework.stereotype.Service; + +import java.util.Map; + +@Service +public class CustomerService { + + private final CustomerRepository customerRepository; + + public CustomerService(CustomerRepository customerRepository) { + this.customerRepository = customerRepository; + } + + public void replicateData(Map customerData, Operation operation) { + final ObjectMapper mapper = new ObjectMapper(); + final Customer customer = mapper.convertValue(customerData, Customer.class); + + if (Operation.DELETE.name().equals(operation.name())) { + customerRepository.deleteById(customer.getId()); + } else { + customerRepository.save(customer); + } + } +} diff --git a/libraries-data-db/src/main/resources/application.yml b/libraries-data-db/src/main/resources/application.yml new file mode 100644 index 0000000000..3fcf4a12a9 --- /dev/null +++ b/libraries-data-db/src/main/resources/application.yml @@ -0,0 +1,34 @@ +## Server properties +server: + port: 8080 + +## Primary/Target Database Properties +spring: + datasource: + url: jdbc:mysql://localhost:3306/customerdb + username: root + password: root + jpa.hibernate.ddl-auto: create-drop + jpa.show-sql: true + +## Source Database Properties +customer: + datasource: + host: localhost + port: 3305 + database: customerdb + username: root + password: root + + ## Logging properties +logging: + level: + root: INFO + io: + debezium: + mysql: + BinlogReader: INFO + com: + baeldung: + libraries: + debezium: DEBUG diff --git a/libraries-data-db/src/main/resources/customer.sql b/libraries-data-db/src/main/resources/customer.sql new file mode 100644 index 0000000000..25e3bcf569 --- /dev/null +++ b/libraries-data-db/src/main/resources/customer.sql @@ -0,0 +1,9 @@ +drop table if exists customer; + +CREATE TABLE customer +( + id integer NOT NULL, + fullname character varying(255), + email character varying(255), + CONSTRAINT customer_pkey PRIMARY KEY (id) +); diff --git a/libraries-data-db/src/main/resources/docker-compose.yml b/libraries-data-db/src/main/resources/docker-compose.yml new file mode 100644 index 0000000000..ddf865cc78 --- /dev/null +++ b/libraries-data-db/src/main/resources/docker-compose.yml @@ -0,0 +1,25 @@ +version: "3.9" +services: + # Install Source MySQL DB and setup the Customer database + mysql-1: + container_name: source-database + image: mysql + command: --default-authentication-plugin=mysql_native_password + restart: always + ports: + - 3305:3306 + environment: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: customerdb + + # Install Target MySQL DB and setup the Customer database + mysql-2: + container_name: target-database + image: mysql + command: --default-authentication-plugin=mysql_native_password + restart: always + ports: + - 3306:3306 + environment: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: customerdb diff --git a/libraries-data-db/src/test/java/com/baeldung/libraries/debezium/DebeziumCDCLiveTest.java b/libraries-data-db/src/test/java/com/baeldung/libraries/debezium/DebeziumCDCLiveTest.java new file mode 100644 index 0000000000..916ea4a120 --- /dev/null +++ b/libraries-data-db/src/test/java/com/baeldung/libraries/debezium/DebeziumCDCLiveTest.java @@ -0,0 +1,61 @@ +package com.baeldung.libraries.debezium; + +import com.baeldung.libraries.debezium.repository.CustomerRepository; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = DebeziumCDCApplication.class) +@ActiveProfiles("test") +public class DebeziumCDCLiveTest { + + @Autowired + private CustomerRepository customerRepository; + + @Autowired + @Qualifier("sourceJdbcTemplate") + private NamedParameterJdbcTemplate jdbcTemplate; + + @Before + public void clearData() { + jdbcTemplate.update("delete from customer where id = :id", Collections.singletonMap("id", 1)); + } + + @DynamicPropertySource + static void registerProperties(DynamicPropertyRegistry registry) { + registry.add("customer.datasource.port", MySQLTestContainerConfiguration::getPort); + } + + @Test + public void whenInsertDataToSourceDatabase_thenCdcOk() throws InterruptedException { + assertThat(customerRepository.findAll().size()).isZero(); + + // insert data to source DB + Map map = new HashMap<>(); + map.put("id", 1); + map.put("fullname", "John Doe"); + map.put("email", "test@test.com"); + + jdbcTemplate.update("INSERT INTO customer(id, fullname, email) VALUES (:id, :fullname, :email)", map); + + // verify target DB + Thread.sleep(10000); + assertThat(customerRepository.findAll().size()).isNotZero(); + } + +} diff --git a/libraries-data-db/src/test/java/com/baeldung/libraries/debezium/MySQLTestContainerConfiguration.java b/libraries-data-db/src/test/java/com/baeldung/libraries/debezium/MySQLTestContainerConfiguration.java new file mode 100644 index 0000000000..e7394e190b --- /dev/null +++ b/libraries-data-db/src/test/java/com/baeldung/libraries/debezium/MySQLTestContainerConfiguration.java @@ -0,0 +1,61 @@ +package com.baeldung.libraries.debezium; + +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.jdbc.datasource.DriverManagerDataSource; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.utility.DockerImageName; + +import javax.sql.DataSource; + +@Configuration +public class MySQLTestContainerConfiguration { + + public static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.0"); + + private static final MySQLContainer mysqlContainer = new MySQLContainer<>(MYSQL_IMAGE) + .withCommand("--default-authentication-plugin=mysql_native_password") + .withInitScript("debezium/customer.sql") + .withDatabaseName("SOURCE_DB") + .withUsername("user") + .withPassword("user") + .withEnv("MYSQL_ROOT_PASSWORD", "user"); + + MySQLTestContainerConfiguration() { + mysqlContainer.start(); + } + + public static int getPort() { + return mysqlContainer.getFirstMappedPort(); + } + + @Bean + @Primary + public EmbeddedDatabase targetDatasource() { + return new EmbeddedDatabaseBuilder() + .setType(EmbeddedDatabaseType.H2) + .setName("TAGRET_DB") + .build(); + } + + @Bean(name = "SOURCE_DS") + public DataSource sourceDataSource() { + DriverManagerDataSource dataSource = new DriverManagerDataSource(); + dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver"); + dataSource.setUrl(mysqlContainer.getJdbcUrl()); + dataSource.setUsername(mysqlContainer.getUsername()); + dataSource.setPassword(mysqlContainer.getPassword()); + return dataSource; + } + + @Bean(name = "sourceJdbcTemplate") + public NamedParameterJdbcTemplate getJdbcTemplate(@Qualifier("SOURCE_DS") DataSource sourceDataSource) { + return new NamedParameterJdbcTemplate(sourceDataSource); + } +} diff --git a/libraries-data-db/src/test/resources/application-test.yml b/libraries-data-db/src/test/resources/application-test.yml new file mode 100644 index 0000000000..96aaa2b727 --- /dev/null +++ b/libraries-data-db/src/test/resources/application-test.yml @@ -0,0 +1,7 @@ +## Source Database Properties +customer: + datasource: + host: localhost + database: SOURCE_DB + username: root + password: user diff --git a/libraries-data-db/src/test/resources/debezium/customer.sql b/libraries-data-db/src/test/resources/debezium/customer.sql new file mode 100644 index 0000000000..25e3bcf569 --- /dev/null +++ b/libraries-data-db/src/test/resources/debezium/customer.sql @@ -0,0 +1,9 @@ +drop table if exists customer; + +CREATE TABLE customer +( + id integer NOT NULL, + fullname character varying(255), + email character varying(255), + CONSTRAINT customer_pkey PRIMARY KEY (id) +);