Merge pull request #15970 from etrandafir93/features/BAEL-7523-evt_externalization_improvements

BAEL-7523: evt externalization improvements
This commit is contained in:
davidmartinezbarua 2024-03-21 12:27:47 -03:00 committed by GitHub
commit 317496fa87
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 141 additions and 59 deletions

View File

@ -6,9 +6,10 @@
<artifactId>spring-boot-libraries-3</artifactId> <artifactId>spring-boot-libraries-3</artifactId>
<parent> <parent>
<artifactId>spring-boot-modules</artifactId> <groupId>com.baeldung</groupId>
<groupId>com.baeldung.spring-boot-modules</groupId> <artifactId>parent-boot-3</artifactId>
<version>1.0.0-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
<relativePath>../../parent-boot-3</relativePath>
</parent> </parent>
<dependencies> <dependencies>
@ -16,12 +17,17 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId> <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.kafka</groupId> <groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId> <artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.modulith</groupId> <groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events-api</artifactId> <artifactId>spring-modulith-events-api</artifactId>
@ -33,17 +39,11 @@
<version>${spring-modulith-events-kafka.version}</version> <version>${spring-modulith-events-kafka.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>org.springframework.modulith</groupId>
<artifactId>jackson-databind</artifactId> <artifactId>spring-modulith-starter-jpa</artifactId>
</dependency> <version>${spring-modulith-events-kafka.version}</version>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId> <artifactId>spring-boot-starter-test</artifactId>
@ -75,8 +75,10 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.h2database</groupId> <groupId>org.testcontainers</groupId>
<artifactId>h2</artifactId> <artifactId>postgresql</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
@ -90,10 +92,12 @@
<properties> <properties>
<java.version>17</java.version> <java.version>17</java.version>
<spring-modulith-events-kafka.version>1.1.3</spring-modulith-events-kafka.version> <spring-boot.version>3.1.5</spring-boot.version>
<testcontainers.version>1.19.6</testcontainers.version> <spring-modulith-events-kafka.version>1.1.2</spring-modulith-events-kafka.version>
<testcontainers.version>1.19.3</testcontainers.version>
<awaitility.version>4.2.0</awaitility.version> <awaitility.version>4.2.0</awaitility.version>
<spring-kafka.version>3.1.2</spring-kafka.version> <postgresql.version>42.3.1</postgresql.version>
</properties> </properties>
</project> </project>

View File

@ -6,32 +6,29 @@ import org.springframework.transaction.annotation.Transactional;
@Service @Service
public class Baeldung { public class Baeldung {
private final ApplicationEventPublisher applicationEvents;
private final ArticleRepository articleRepository;
private final ApplicationEventPublisher applicationEvents; public Baeldung(ApplicationEventPublisher applicationEvents, ArticleRepository articleRepository) {
private final ArticleRepository articleRepository; this.applicationEvents = applicationEvents;
this.articleRepository = articleRepository;
}
@Transactional
public void createArticle(Article article) {
// ... business logic
validateArticle(article);
article = addArticleTags(article);
article = articleRepository.save(article);
public Baeldung(ApplicationEventPublisher applicationEvents, ArticleRepository articleRepository) { applicationEvents.publishEvent(new ArticlePublishedEvent(article.slug(), article.title()));
this.applicationEvents = applicationEvents; }
this.articleRepository = articleRepository;
}
@Transactional private Article addArticleTags(Article article) {
public void createArticle(Article article) { return article;
// ... business logic }
validateArticle(article);
article = addArticleTags(article);
article = articleRepository.save(article);
applicationEvents.publishEvent(new ArticlePublishedEvent(article.slug(), article.title())); private void validateArticle(Article article) {
} }
private Article addArticleTags(Article article) {
return article;
}
private void validateArticle(Article article) {
}
} }

View File

@ -4,12 +4,14 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.modulith.events.EventExternalizationConfiguration; import org.springframework.modulith.events.EventExternalizationConfiguration;
import org.springframework.modulith.events.RoutingTarget; import org.springframework.modulith.events.RoutingTarget;
import java.util.Objects;
@Configuration @Configuration
class EventExternalizationConfig { class EventExternalizationConfig {
@ -23,19 +25,30 @@ class EventExternalizationConfig {
) )
.mapping( .mapping(
ArticlePublishedEvent.class, ArticlePublishedEvent.class,
it -> new ArticlePublishedKafkaEvent(it.slug(), it.title()) it -> new PostPublishedKafkaEvent(it.slug(), it.title())
)
.route(
WeeklySummaryPublishedEvent.class,
it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.handle())
)
.mapping(
WeeklySummaryPublishedEvent.class,
it -> new PostPublishedKafkaEvent(it.handle(), it.heading())
) )
.build(); .build();
} }
record ArticlePublishedKafkaEvent(String slug, String title) {
}
@Bean @Bean
KafkaOperations<String, ArticlePublishedEvent> kafkaOperations(KafkaProperties kafkaProperties) { KafkaOperations<String, ArticlePublishedEvent> kafkaOperations(KafkaProperties kafkaProperties) {
ProducerFactory<String, ArticlePublishedEvent> producerFactory = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties()); ProducerFactory<String, ArticlePublishedEvent> producerFactory = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
return new KafkaTemplate<>(producerFactory); return new KafkaTemplate<>(producerFactory);
} }
record PostPublishedKafkaEvent(String slug, String title) {
PostPublishedKafkaEvent {
Objects.requireNonNull(slug, "Article Slug must not be null!");
}
}
} }

View File

@ -0,0 +1,7 @@
package com.baeldung.springmodulith.events.externalization;
import org.springframework.modulith.events.Externalized;
@Externalized
record WeeklySummaryPublishedEvent(String handle, String heading) {
}

View File

@ -0,0 +1,38 @@
package com.baeldung.springmodulith.events.externalization.infra;
import com.baeldung.springmodulith.events.externalization.ArticlePublishedEvent;
import org.springframework.modulith.events.CompletedEventPublications;
import org.springframework.modulith.events.IncompleteEventPublications;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.Instant;
@Component
class PublicationEvents {
private final IncompleteEventPublications incompleteEvent;
private final CompletedEventPublications completeEvents;
public PublicationEvents(IncompleteEventPublications incompleteEvent, CompletedEventPublications completeEvents) {
this.incompleteEvent = incompleteEvent;
this.completeEvents = completeEvents;
}
public void resubmitUnpublishedEvents() {
incompleteEvent.resubmitIncompletePublicationsOlderThan(Duration.ofSeconds(60));
// or
incompleteEvent.resubmitIncompletePublications(it ->
it.getPublicationDate().isBefore(Instant.now().minusSeconds(60))
&& it.getEvent() instanceof ArticlePublishedEvent);
}
public void clearPublishedEvents() {
completeEvents.deletePublicationsOlderThan(Duration.ofSeconds(60));
// or
completeEvents.deletePublications(it ->
it.getPublicationDate().isBefore(Instant.now().minusSeconds(60))
&& it.getEvent() instanceof ArticlePublishedEvent);
}
}

View File

@ -1,4 +1,3 @@
logging.level.org.springframework.orm.jpa: TRACE
spring.kafka: spring.kafka:
bootstrap-servers: localhost:9092 bootstrap-servers: localhost:9092
@ -10,3 +9,20 @@ spring.kafka:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest auto-offset-reset: earliest
spring.modulith:
republish-outstanding-events-on-restart: true
events.jdbc.schema-initialization.enabled: true
logging.level.org.springframework.orm.jpa: TRACE
spring:
datasource:
username: test_user
password: test_pass
jpa:
properties:
hibernate:
dialect: org.hibernate.dialect.PostgreSQLDialect
hbm2ddl.auto: create

View File

@ -1,10 +1,8 @@
package com.baeldung.springmodulith.events.externalization; package com.baeldung.springmodulith.events.externalization;
import static java.time.Duration.ofMillis; import com.baeldung.springmodulith.Application;
import static java.time.Duration.ofSeconds; import com.baeldung.springmodulith.events.externalization.listener.TestKafkaListenerConfig;
import static org.assertj.core.api.Assertions.assertThat; import com.baeldung.springmodulith.events.externalization.listener.TestListener;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -12,14 +10,16 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource; import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerImageName;
import com.baeldung.springmodulith.Application; import static java.time.Duration.ofMillis;
import com.baeldung.springmodulith.events.externalization.listener.TestKafkaListenerConfig; import static java.time.Duration.ofSeconds;
import com.baeldung.springmodulith.events.externalization.listener.TestListener; import static org.assertj.core.api.Assertions.assertThat;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
@Testcontainers @Testcontainers
@SpringBootTest(classes = { Application.class, TestKafkaListenerConfig.class }) @SpringBootTest(classes = { Application.class, TestKafkaListenerConfig.class })
@ -35,13 +35,20 @@ class EventsExternalizationLiveTest {
@Container @Container
static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@Container
public static PostgreSQLContainer postgresqlContainer = new PostgreSQLContainer()
.withDatabaseName("test_db")
.withUsername("test_user")
.withPassword("test_pass");
@DynamicPropertySource @DynamicPropertySource
static void dynamicProperties(DynamicPropertyRegistry registry) { static void dynamicProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers); registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
registry.add("spring.datasource.url", postgresqlContainer::getJdbcUrl);
} }
static { static {
Awaitility.setDefaultTimeout(ofSeconds(3)); Awaitility.setDefaultTimeout(ofSeconds(50));
Awaitility.setDefaultPollDelay(ofMillis(100)); Awaitility.setDefaultPollDelay(ofMillis(100));
} }