BAEL-7351 moving to a new module
This commit is contained in:
parent
ca9ebc37cf
commit
d8e035da08
|
@ -0,0 +1,33 @@
|
|||
HELP.md
|
||||
target/
|
||||
!.mvn/wrapper/maven-wrapper.jar
|
||||
!**/src/main/**/target/
|
||||
!**/src/test/**/target/
|
||||
|
||||
### STS ###
|
||||
.apt_generated
|
||||
.classpath
|
||||
.factorypath
|
||||
.project
|
||||
.settings
|
||||
.springBeans
|
||||
.sts4-cache
|
||||
|
||||
### IntelliJ IDEA ###
|
||||
.idea
|
||||
*.iws
|
||||
*.iml
|
||||
*.ipr
|
||||
|
||||
### NetBeans ###
|
||||
/nbproject/private/
|
||||
/nbbuild/
|
||||
/dist/
|
||||
/nbdist/
|
||||
/.nb-gradle/
|
||||
build/
|
||||
!**/src/main/**/build/
|
||||
!**/src/test/**/build/
|
||||
|
||||
### VS Code ###
|
||||
.vscode/
|
|
@ -0,0 +1,50 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-boot-2</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../parent-boot-2</relativePath>
|
||||
</parent>
|
||||
|
||||
<artifactId>apache-kafka-3</artifactId>
|
||||
<name>Apache Kafka 3</name>
|
||||
<description>Third module for Apache Kafka related articles</description>
|
||||
<properties>
|
||||
<java.version>17</java.version>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
|
@ -1,4 +1,4 @@
|
|||
package com.baeldung.spring.kafka.groupId;
|
||||
package com.baeldung.apachekafka3.groupId;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
|
@ -1,7 +1,6 @@
|
|||
package com.baeldung.spring.kafka.groupId;
|
||||
package com.baeldung.apachekafka3.groupId;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.errors.RecordDeserializationException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -13,12 +12,6 @@ class KafkaErrorHandler implements CommonErrorHandler {
|
|||
|
||||
private static final Logger log = LoggerFactory.getLogger(KafkaErrorHandler.class);
|
||||
|
||||
@Override
|
||||
public void handleRecord(@NonNull Exception exception, @NonNull ConsumerRecord<?, ?> record, @NonNull Consumer<?, ?> consumer,
|
||||
@NonNull MessageListenerContainer container) {
|
||||
handle(exception, consumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleOtherException(@NonNull Exception exception, @NonNull Consumer<?, ?> consumer, @NonNull MessageListenerContainer container,
|
||||
boolean batchListener) {
|
|
@ -1,14 +1,14 @@
|
|||
package com.baeldung.spring.kafka.groupId;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
|
||||
@SpringBootApplication
|
||||
@ComponentScan(basePackages = "com.baeldung.spring.kafka.groupId")
|
||||
public class Main {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(Main.class, args);
|
||||
}
|
||||
}
|
||||
package com.baeldung.apachekafka3.groupId;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
|
||||
@SpringBootApplication
|
||||
@ComponentScan(basePackages = "com.baeldung.apachekafka3.groupId")
|
||||
public class Main {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(Main.class, args);
|
||||
}
|
||||
}
|
|
@ -1,41 +1,41 @@
|
|||
package com.baeldung.spring.kafka.groupId;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.messaging.handler.annotation.Payload;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class MyKafkaConsumer {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaConsumer.class);
|
||||
|
||||
private CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
private String payload;
|
||||
|
||||
@KafkaListener(topics = "${kafka.topic.name:test-topic}", groupId = "${kafka.consumer.groupId:test-consumer-group}", concurrency = "4")
|
||||
public void receive(@Payload String payload, Consumer<String, String> consumer) {
|
||||
LOGGER.info("Consumer='{}' received payload='{}'", consumer.groupMetadata()
|
||||
.memberId(), payload);
|
||||
this.payload = payload;
|
||||
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
public CountDownLatch getLatch() {
|
||||
return latch;
|
||||
}
|
||||
|
||||
public void resetLatch() {
|
||||
latch = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
public String getPayload() {
|
||||
return payload;
|
||||
}
|
||||
}
|
||||
package com.baeldung.apachekafka3.groupId;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.messaging.handler.annotation.Payload;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class MyKafkaConsumer {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaConsumer.class);
|
||||
|
||||
private CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
private String payload;
|
||||
|
||||
@KafkaListener(topics = "${kafka.topic.name:test-topic}", clientIdPrefix = "neo", groupId = "${kafka.consumer.groupId:test-consumer-group}", concurrency = "4")
|
||||
public void receive(@Payload String payload, Consumer<String, String> consumer) {
|
||||
LOGGER.info("Consumer='{}' received payload='{}'", consumer.groupMetadata()
|
||||
.memberId(), payload);
|
||||
this.payload = payload;
|
||||
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
public CountDownLatch getLatch() {
|
||||
return latch;
|
||||
}
|
||||
|
||||
public void resetLatch() {
|
||||
latch = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
public String getPayload() {
|
||||
return payload;
|
||||
}
|
||||
}
|
|
@ -1,24 +1,24 @@
|
|||
package com.baeldung.spring.kafka.groupId;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class MyKafkaProducer {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaProducer.class);
|
||||
|
||||
@Value("${kafka.topic.name:test-topic}")
|
||||
private String topic;
|
||||
@Autowired
|
||||
private KafkaTemplate<String, String> kafkaTemplate;
|
||||
|
||||
public void send(String payload) {
|
||||
LOGGER.info("Sending payload='{}' to topic='{}'", payload, topic);
|
||||
kafkaTemplate.send(topic, payload);
|
||||
}
|
||||
}
|
||||
package com.baeldung.apachekafka3.groupId;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class MyKafkaProducer {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaProducer.class);
|
||||
|
||||
@Value("${kafka.topic.name:test-topic}")
|
||||
private String topic;
|
||||
@Autowired
|
||||
private KafkaTemplate<String, String> kafkaTemplate;
|
||||
|
||||
public void send(String payload) {
|
||||
LOGGER.info("Sending payload='{}' to topic='{}'", payload, topic);
|
||||
kafkaTemplate.send(topic, payload);
|
||||
}
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
kafka.topic.name=test-topic
|
|
@ -1,44 +1,44 @@
|
|||
package com.baeldung.spring.kafka.groupId;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.kafka.test.context.EmbeddedKafka;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
|
||||
@SpringBootTest(classes = Main.class)
|
||||
@ComponentScan(basePackages = "com.baeldung.spring.kafka.groupId")
|
||||
@DirtiesContext
|
||||
@EmbeddedKafka(partitions = 4, topics = { "${kafka.topic.name:test-topic}" }, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
|
||||
public class MainLiveTest {
|
||||
|
||||
@Autowired
|
||||
private MyKafkaConsumer consumer;
|
||||
@Autowired
|
||||
private MyKafkaProducer producer;
|
||||
|
||||
@BeforeEach
|
||||
void setup() {
|
||||
consumer.resetLatch();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived() throws Exception {
|
||||
String data = "Test 123...";
|
||||
|
||||
producer.send(data);
|
||||
|
||||
boolean messageConsumed = consumer.getLatch()
|
||||
.await(10, TimeUnit.SECONDS);
|
||||
assertTrue(messageConsumed);
|
||||
assertThat(consumer.getPayload(), containsString(data));
|
||||
}
|
||||
}
|
||||
package com.baeldung.apachekafka3.groupId;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.kafka.test.context.EmbeddedKafka;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
|
||||
@SpringBootTest(classes = Main.class)
|
||||
@ComponentScan(basePackages = "com.baeldung.apachekafka3.groupId")
|
||||
@DirtiesContext
|
||||
@EmbeddedKafka(partitions = 4, topics = { "${kafka.topic.name:test-topic}" }, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
|
||||
public class MainLiveTest {
|
||||
|
||||
@Autowired
|
||||
private MyKafkaConsumer consumer;
|
||||
@Autowired
|
||||
private MyKafkaProducer producer;
|
||||
|
||||
@BeforeEach
|
||||
void setup() {
|
||||
consumer.resetLatch();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived() throws Exception {
|
||||
String data = "Test 123...";
|
||||
|
||||
producer.send(data);
|
||||
|
||||
boolean messageConsumed = consumer.getLatch()
|
||||
.await(10, TimeUnit.SECONDS);
|
||||
assertTrue(messageConsumed);
|
||||
assertThat(consumer.getPayload(), containsString(data));
|
||||
}
|
||||
}
|
3
pom.xml
3
pom.xml
|
@ -670,8 +670,9 @@
|
|||
<module>apache-httpclient-2</module>
|
||||
<module>apache-httpclient4</module>
|
||||
<module>apache-httpclient</module>
|
||||
<module>apache-kafka-2</module>
|
||||
<module>apache-kafka</module>
|
||||
<module>apache-kafka-2</module>
|
||||
<module>apache-kafka-3</module>
|
||||
<module>apache-libraries-2</module>
|
||||
<module>apache-libraries</module>
|
||||
<module>apache-olingo</module><!-- apache-olingo wasn't updated to boot-3 because a workaround for jakarta namespace wasn't found JAVA-27818 -->
|
||||
|
|
Loading…
Reference in New Issue