BAEL-6953 implemented (#15304)
* BAEL-6953 implemented * BAEL-6953 added live test * Fixed test
This commit is contained in:
parent
671957da9b
commit
1f7efeddaf
BIN
gradle-modules/gradle-7/toolchains-feature/gradle/wrapper/gradle-wrapper.jar
vendored
Normal file
BIN
gradle-modules/gradle-7/toolchains-feature/gradle/wrapper/gradle-wrapper.jar
vendored
Normal file
Binary file not shown.
1
pom.xml
1
pom.xml
|
@ -867,6 +867,7 @@
|
|||
<module>spring-jersey</module>
|
||||
<module>spring-jinq</module>
|
||||
<module>spring-kafka-2</module>
|
||||
<module>spring-kafka-3</module>
|
||||
<module>spring-kafka</module>
|
||||
<module>spring-katharsis</module>
|
||||
<module>spring-mobile</module>
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-boot-2</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../parent-boot-2</relativePath>
|
||||
</parent>
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>spring-kafka-3</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>spring-kafka-3</name>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
<kafka-version>3.0.12</kafka-version>
|
||||
</properties>
|
||||
</project>
|
|
@ -0,0 +1,37 @@
|
|||
package com.baeldung.spring.kafka;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
public class SomeData {
|
||||
|
||||
private String id;
|
||||
private String type;
|
||||
private String status;
|
||||
private Instant timestamp;
|
||||
|
||||
public SomeData() {
|
||||
}
|
||||
|
||||
public SomeData(String id, String type, String status, Instant timestamp) {
|
||||
this.id = id;
|
||||
this.type = type;
|
||||
this.status = status;
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public String getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public Instant getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
package com.baeldung.spring.kafka;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
||||
|
||||
@Configuration
|
||||
public class ListenerConfiguration {
|
||||
|
||||
@Bean("messageListenerContainer")
|
||||
public ConcurrentKafkaListenerContainerFactory<String, SomeData> messageListenerContainer() {
|
||||
ConcurrentKafkaListenerContainerFactory<String, SomeData> container = new ConcurrentKafkaListenerContainerFactory<>();
|
||||
container.setConsumerFactory(someDataConsumerFactory());
|
||||
return container;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConsumerFactory<String, SomeData> someDataConsumerFactory() {
|
||||
JsonDeserializer<SomeData> payloadJsonDeserializer = new JsonDeserializer<>();
|
||||
payloadJsonDeserializer.trustedPackages("com.baeldung.spring.kafka");
|
||||
return new DefaultKafkaConsumerFactory<>(
|
||||
consumerConfigs(),
|
||||
new StringDeserializer(),
|
||||
payloadJsonDeserializer
|
||||
);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Map<String, Object> consumerConfigs() {
|
||||
return Map.of(
|
||||
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://localhost:9092",
|
||||
ConsumerConfig.GROUP_ID_CONFIG, "some-group-id"
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
package com.baeldung.spring.kafka;
|
||||
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.support.serializer.JsonSerializer;
|
||||
import org.springframework.kafka.support.serializer.StringOrBytesSerializer;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
|
||||
@Configuration
|
||||
public class ProducerConfiguration {
|
||||
|
||||
@Bean
|
||||
public KafkaTemplate<Object, SomeData> kafkaTemplate() {
|
||||
return new KafkaTemplate<>(producerFactory());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ProducerFactory<Object, SomeData> producerFactory() {
|
||||
JsonSerializer<SomeData> jsonSerializer = new JsonSerializer<>();
|
||||
jsonSerializer.setAddTypeInfo(true);
|
||||
return new DefaultKafkaProducerFactory<>(
|
||||
producerFactoryConfig(),
|
||||
new StringOrBytesSerializer(),
|
||||
jsonSerializer
|
||||
);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Map<String, Object> producerFactoryConfig() {
|
||||
return Map.of(
|
||||
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://localhost:9092"
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
package com.baeldung.spring.kafka;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.mock.mockito.SpyBean;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* This test requires a running instance of kafka to be present
|
||||
*/
|
||||
@SpringBootTest
|
||||
public class TrustedPackagesLiveTest {
|
||||
|
||||
@Autowired
|
||||
private KafkaTemplate<Object, SomeData> kafkaTemplate;
|
||||
|
||||
@SpyBean
|
||||
TestConsumer testConsumer;
|
||||
|
||||
@Test
|
||||
void givenMessageInTheTopic_whenTypeInfoPackageIsTrusted_thenMessageIsSuccessfullyConsumed() throws InterruptedException {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
Mockito.doAnswer(invocationOnMock -> {
|
||||
try {
|
||||
latch.countDown();
|
||||
return invocationOnMock.callRealMethod();
|
||||
} catch (Exception e) {
|
||||
return null;
|
||||
}
|
||||
}).when(testConsumer).onMessage(Mockito.any());
|
||||
|
||||
SomeData someData = new SomeData("1", "active", "sent", Instant.now());
|
||||
kafkaTemplate.send(new ProducerRecord<>("sourceTopic", null, someData));
|
||||
|
||||
Assertions.assertTrue(latch.await(20L, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Component
|
||||
static class TestConsumer {
|
||||
|
||||
@KafkaListener(topics = "sourceTopic", containerFactory = "messageListenerContainer")
|
||||
public void onMessage(SomeData someData) {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue