* BAEL-7490 read write file in separate thread

* Change the to try resources

* Update the code to sync with article

* First draft

* Fix Main error

* update the main code to send message
This commit is contained in:
Wynn Teo 2024-02-17 00:09:20 +08:00 committed by GitHub
parent 20f151aad0
commit d3287d7d3e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 96 additions and 0 deletions

View File

@ -0,0 +1,33 @@
package com.baeldung.spring.kafka.viewheaders;
import java.util.Map;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
@Service
public class KafkaMessageConsumer {
@KafkaListener(topics = { "my-topic" }, groupId = "my-consumer-group")
public void listen(@Payload String message, @Headers Map<String, Object> headers) {
System.out.println("Received message: " + message);
System.out.println("Headers:");
headers.forEach((key, value) -> System.out.println(key + ": " + value));
String topicName = (String) headers.get(KafkaHeaders.TOPIC);
System.out.println("Topic: " + topicName);
int partitionID = (int) headers.get(KafkaHeaders.RECEIVED_PARTITION_ID);
System.out.println("Partition ID: " + partitionID);
}
@KafkaListener(topics = { "my-topic" }, groupId = "my-consumer-group")
public void listen(@Payload String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topicName,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println("Topic: " + topicName);
System.out.println("Partition ID: " + partition);
}
}

View File

@ -0,0 +1,63 @@
package com.baeldung.spring.kafka.viewheaders;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@SpringBootApplication
public class Main {
@Autowired
static KafkaMessageConsumer kafkaMessageConsumer;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaConsumer<String, String> kafkaConsumer() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
return new KafkaConsumer<>(configProps);
}
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(Main.class, args);
// Get the KafkaTemplate bean from the application context
KafkaTemplate<String, String> kafkaTemplate = context.getBean(KafkaTemplate.class);
// Send a message to the "my-topic" Kafka topic
String message = "Hello Baeldung!";
kafkaTemplate.send("my-topic", message);
// Close the application context
context.close();
}
}