BAEL-7140: spring modulith evt externalization
This commit is contained in:
parent
1724f67a39
commit
4021e9861e
|
@ -0,0 +1,5 @@
|
||||||
|
## Spring Boot Libraries
|
||||||
|
|
||||||
|
This module contains articles about various Spring Boot libraries
|
||||||
|
|
||||||
|
### Relevant Articles:
|
|
@ -0,0 +1,89 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<artifactId>spring-boot-libraries-3</artifactId>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<artifactId>spring-boot-modules</artifactId>
|
||||||
|
<groupId>com.baeldung.spring-boot-modules</groupId>
|
||||||
|
<version>1.0.0-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-data-jpa</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.kafka</groupId>
|
||||||
|
<artifactId>spring-kafka</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.modulith</groupId>
|
||||||
|
<artifactId>spring-modulith-events-api</artifactId>
|
||||||
|
<version>${spring-modulith-events-kafka.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.modulith</groupId>
|
||||||
|
<artifactId>spring-modulith-events-kafka</artifactId>
|
||||||
|
<version>${spring-modulith-events-kafka.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-test</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-testcontainers</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.testcontainers</groupId>
|
||||||
|
<artifactId>kafka</artifactId>
|
||||||
|
<version>${testcontainers.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.testcontainers</groupId>
|
||||||
|
<artifactId>testcontainers</artifactId>
|
||||||
|
<version>${testcontainers.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.testcontainers</groupId>
|
||||||
|
<artifactId>junit-jupiter</artifactId>
|
||||||
|
<version>${testcontainers.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.h2database</groupId>
|
||||||
|
<artifactId>h2</artifactId>
|
||||||
|
<version>2.2.224</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.awaitility</groupId>
|
||||||
|
<artifactId>awaitility</artifactId>
|
||||||
|
<version>${awaitility.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<java.version>17</java.version>
|
||||||
|
<spring-boot.version>3.1.5</spring-boot.version>
|
||||||
|
<spring-modulith-events-kafka.version>1.1.2</spring-modulith-events-kafka.version>
|
||||||
|
<testcontainers.version>1.19.3</testcontainers.version>
|
||||||
|
<awaitility.version>4.2.0</awaitility.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
</project>
|
|
@ -0,0 +1,13 @@
|
||||||
|
package com.baeldung.springmodulith;
|
||||||
|
|
||||||
|
import org.springframework.boot.SpringApplication;
|
||||||
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
|
||||||
|
@SpringBootApplication
|
||||||
|
public class Application {
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
SpringApplication.run( Application.class, args);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
package com.baeldung.springmodulith.events.externalization;
|
||||||
|
|
||||||
|
import static jakarta.persistence.GenerationType.*;
|
||||||
|
|
||||||
|
import jakarta.persistence.Entity;
|
||||||
|
import jakarta.persistence.GeneratedValue;
|
||||||
|
import jakarta.persistence.Id;
|
||||||
|
|
||||||
|
@Entity
|
||||||
|
public class Article {
|
||||||
|
@Id
|
||||||
|
@GeneratedValue(strategy = AUTO)
|
||||||
|
private Long id;
|
||||||
|
|
||||||
|
private String slug;
|
||||||
|
private String title;
|
||||||
|
private String author;
|
||||||
|
private String content;
|
||||||
|
|
||||||
|
public Article(String slug, String title, String author, String content) {
|
||||||
|
this.slug = slug;
|
||||||
|
this.title = title;
|
||||||
|
this.author = author;
|
||||||
|
this.content = content;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Article() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public String author() {
|
||||||
|
return author;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String content() {
|
||||||
|
return content;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String slug() {
|
||||||
|
return slug;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String title() {
|
||||||
|
return title;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,7 @@
|
||||||
|
package com.baeldung.springmodulith.events.externalization;
|
||||||
|
|
||||||
|
import org.springframework.modulith.events.Externalized;
|
||||||
|
|
||||||
|
@Externalized("baeldung.article.published::#{slug()}")
|
||||||
|
public record ArticlePublishedEvent(String slug, String title) {
|
||||||
|
}
|
|
@ -0,0 +1,8 @@
|
||||||
|
package com.baeldung.springmodulith.events.externalization;
|
||||||
|
|
||||||
|
import org.springframework.data.repository.CrudRepository;
|
||||||
|
import org.springframework.stereotype.Repository;
|
||||||
|
|
||||||
|
@Repository
|
||||||
|
interface ArticleRepository extends CrudRepository<Article, Long> {
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
package com.baeldung.springmodulith.events.externalization;
|
||||||
|
|
||||||
|
import org.springframework.context.ApplicationEventPublisher;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class Baeldung {
|
||||||
|
|
||||||
|
private final ApplicationEventPublisher applicationEvents;
|
||||||
|
private final ArticleRepository articleRepository;
|
||||||
|
|
||||||
|
|
||||||
|
public Baeldung(ApplicationEventPublisher applicationEvents, ArticleRepository articleRepository) {
|
||||||
|
this.applicationEvents = applicationEvents;
|
||||||
|
this.articleRepository = articleRepository;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Transactional
|
||||||
|
public void createArticle(Article article) {
|
||||||
|
// ... business logic
|
||||||
|
validateArticle(article);
|
||||||
|
article = addArticleTags(article);
|
||||||
|
article = articleRepository.save(article);
|
||||||
|
|
||||||
|
applicationEvents.publishEvent(new ArticlePublishedEvent(article.slug(), article.title()));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private Article addArticleTags(Article article) {
|
||||||
|
return article;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void validateArticle(Article article) {
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,41 @@
|
||||||
|
package com.baeldung.springmodulith.events.externalization;
|
||||||
|
|
||||||
|
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
import org.springframework.kafka.core.ProducerFactory;
|
||||||
|
import org.springframework.kafka.core.KafkaOperations;
|
||||||
|
import org.springframework.modulith.events.EventExternalizationConfiguration;
|
||||||
|
import org.springframework.modulith.events.RoutingTarget;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
class EventExternalizationConfig {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
EventExternalizationConfiguration eventExternalizationConfiguration() {
|
||||||
|
return EventExternalizationConfiguration.externalizing()
|
||||||
|
.select(EventExternalizationConfiguration.annotatedAsExternalized())
|
||||||
|
.route(
|
||||||
|
ArticlePublishedEvent.class,
|
||||||
|
it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.slug())
|
||||||
|
)
|
||||||
|
.mapping(
|
||||||
|
ArticlePublishedEvent.class,
|
||||||
|
it -> new ArticlePublishedKafkaEvent(it.slug(), it.title())
|
||||||
|
)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
record ArticlePublishedKafkaEvent(String slug, String title) {
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
KafkaOperations<String, ArticlePublishedEvent> kafkaOperations(KafkaProperties kafkaProperties) {
|
||||||
|
ProducerFactory<String, ArticlePublishedEvent> producerFactory = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
|
||||||
|
return new KafkaTemplate<>(producerFactory);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
package com.baeldung.springmodulith.events.externalization.infra;
|
||||||
|
|
||||||
|
import org.springframework.context.event.EventListener;
|
||||||
|
import org.springframework.kafka.core.KafkaOperations;
|
||||||
|
import org.springframework.scheduling.annotation.Async;
|
||||||
|
import org.springframework.transaction.event.TransactionalEventListener;
|
||||||
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
|
import com.baeldung.springmodulith.events.externalization.ArticlePublishedEvent;
|
||||||
|
|
||||||
|
//@Component
|
||||||
|
// this is used in sections 3 and 4 of tha article
|
||||||
|
// but it will cause the tests to fail if it used together with the @Externalized annotation
|
||||||
|
class ArticlePublishedKafkaProducer {
|
||||||
|
|
||||||
|
private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;
|
||||||
|
|
||||||
|
public ArticlePublishedKafkaProducer(KafkaOperations<String, ArticlePublishedEvent> messageProducer) {
|
||||||
|
this.messageProducer = messageProducer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@EventListener
|
||||||
|
public void publish(ArticlePublishedEvent event) {
|
||||||
|
Assert.notNull(event.slug(), "Article Slug must not be null!");
|
||||||
|
messageProducer.send("baeldung.articles.published", event);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Async
|
||||||
|
@TransactionalEventListener
|
||||||
|
public void publishAsync(ArticlePublishedEvent article) {
|
||||||
|
Assert.notNull(article.slug(), "Article Slug must not be null!");
|
||||||
|
messageProducer.send("baeldung.articles.published", article.slug(), article);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
logging.level.org.springframework.orm.jpa: TRACE
|
||||||
|
|
||||||
|
spring.kafka:
|
||||||
|
bootstrap-servers: localhost:9092
|
||||||
|
producer:
|
||||||
|
key-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||||
|
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
|
||||||
|
type-mapping: com.baeldung.annotation.events.externalization.producer.ArticlePublished
|
||||||
|
consumer:
|
||||||
|
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||||
|
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||||
|
auto-offset-reset: earliest
|
|
@ -0,0 +1,89 @@
|
||||||
|
package com.baeldung.springmodulith.events.externalization;
|
||||||
|
|
||||||
|
import static java.time.Duration.ofMillis;
|
||||||
|
import static java.time.Duration.ofSeconds;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
|
import org.springframework.test.context.DynamicPropertyRegistry;
|
||||||
|
import org.springframework.test.context.DynamicPropertySource;
|
||||||
|
import org.testcontainers.containers.KafkaContainer;
|
||||||
|
import org.testcontainers.junit.jupiter.Container;
|
||||||
|
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||||
|
import org.testcontainers.shaded.org.awaitility.Awaitility;
|
||||||
|
import org.testcontainers.utility.DockerImageName;
|
||||||
|
|
||||||
|
import com.baeldung.springmodulith.Application;
|
||||||
|
import com.baeldung.springmodulith.events.externalization.listener.TestKafkaListenerConfig;
|
||||||
|
import com.baeldung.springmodulith.events.externalization.listener.TestListener;
|
||||||
|
|
||||||
|
@Testcontainers
|
||||||
|
@SpringBootTest(classes = { Application.class, TestKafkaListenerConfig.class })
|
||||||
|
class EventsExternalizationLiveTest {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private Baeldung baeldung;
|
||||||
|
@Autowired
|
||||||
|
private TestListener listener;
|
||||||
|
@Autowired
|
||||||
|
private ArticleRepository repository;
|
||||||
|
|
||||||
|
@Container
|
||||||
|
static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
|
||||||
|
|
||||||
|
@DynamicPropertySource
|
||||||
|
static void dynamicProperties(DynamicPropertyRegistry registry) {
|
||||||
|
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
|
||||||
|
}
|
||||||
|
|
||||||
|
static {
|
||||||
|
Awaitility.setDefaultTimeout(ofSeconds(3));
|
||||||
|
Awaitility.setDefaultPollDelay(ofMillis(100));
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void beforeEach() {
|
||||||
|
listener.reset();
|
||||||
|
repository.deleteAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void whenArticleIsSavedToDB_thenItIsAlsoPublishedToKafka() {
|
||||||
|
var article = new Article("introduction-to-spring-boot", "Introduction to Spring Boot", "John Doe", "<p> Spring Boot is [...] </p>");
|
||||||
|
|
||||||
|
baeldung.createArticle(article);
|
||||||
|
|
||||||
|
await().untilAsserted(() ->
|
||||||
|
assertThat(listener.getEvents())
|
||||||
|
.hasSize(1)
|
||||||
|
.first().asString()
|
||||||
|
.contains("\"slug\":\"introduction-to-spring-boot\"")
|
||||||
|
.contains("\"title\":\"Introduction to Spring Boot\""));
|
||||||
|
|
||||||
|
assertThat(repository.findAll())
|
||||||
|
.hasSize(1)
|
||||||
|
.first()
|
||||||
|
.extracting(Article::slug, Article::title)
|
||||||
|
.containsExactly("introduction-to-spring-boot", "Introduction to Spring Boot");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void whenPublishingMessageFails_thenArticleIsStillSavedToDB() {
|
||||||
|
var article = new Article(null, "Introduction to Spring Boot", "John Doe", "<p> Spring Boot is [...] </p>");
|
||||||
|
|
||||||
|
baeldung.createArticle(article);
|
||||||
|
|
||||||
|
assertThat(listener.getEvents())
|
||||||
|
.isEmpty();
|
||||||
|
|
||||||
|
assertThat(repository.findAll())
|
||||||
|
.hasSize(1)
|
||||||
|
.first()
|
||||||
|
.extracting(Article::title, Article::author)
|
||||||
|
.containsExactly("Introduction to Spring Boot", "John Doe");
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
package com.baeldung.springmodulith.events.externalization.listener;
|
||||||
|
|
||||||
|
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.annotation.EnableKafka;
|
||||||
|
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||||
|
import org.springframework.kafka.config.KafkaListenerContainerFactory;
|
||||||
|
import org.springframework.kafka.core.ConsumerFactory;
|
||||||
|
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||||
|
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||||
|
|
||||||
|
@EnableKafka
|
||||||
|
@Configuration
|
||||||
|
public class TestKafkaListenerConfig {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory(
|
||||||
|
ConsumerFactory<Integer, String> consumerFactory
|
||||||
|
) {
|
||||||
|
var factory = new ConcurrentKafkaListenerContainerFactory<Integer, String>();
|
||||||
|
factory.setConsumerFactory(consumerFactory);
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
ConsumerFactory<Integer, String> consumerFactory(KafkaProperties kafkaProperties) {
|
||||||
|
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
package com.baeldung.springmodulith.events.externalization.listener;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.springframework.kafka.annotation.KafkaListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class TestListener {
|
||||||
|
private List<String> events = new ArrayList<>();
|
||||||
|
|
||||||
|
@KafkaListener(id = "test-id", topics = "baeldung.articles.published")
|
||||||
|
public void listen(String event) {
|
||||||
|
events.add(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getEvents() {
|
||||||
|
return events;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void reset() {
|
||||||
|
events = new ArrayList<>();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue