BAEL-6415: Add simple Kafka client example (with BootStrap Servers setting)
This commit is contained in:
parent
d278ca2066
commit
5dc9e85c2d
|
@ -0,0 +1,27 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<artifactId>apache-kafka-3</artifactId>
|
||||||
|
<name>apache-kafka-3</name>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<groupId>com.baeldung</groupId>
|
||||||
|
<artifactId>parent-modules</artifactId>
|
||||||
|
<version>1.0.0-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.kafka</groupId>
|
||||||
|
<artifactId>kafka-clients</artifactId>
|
||||||
|
<version>${kafka.version}</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<kafka.version>3.5.1</kafka.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
</project>
|
|
@ -0,0 +1,39 @@
|
||||||
|
package com.baeldung.kafka;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.*;
|
||||||
|
import org.apache.kafka.common.serialization.LongDeserializer;
|
||||||
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
public class KafkaJava {
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
try(final Consumer<Long, String> consumer = createConsumer()) {
|
||||||
|
ConsumerRecords<Long, String> records = consumer.poll(Duration.ofMinutes(1));
|
||||||
|
for(ConsumerRecord<Long, String> record : records) {
|
||||||
|
System.out.println(record.value());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Consumer<Long, String> createConsumer() {
|
||||||
|
final Properties props = new Properties();
|
||||||
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
|
||||||
|
"localhost:9092,another-host.com:29092");
|
||||||
|
props.put(ConsumerConfig.GROUP_ID_CONFIG,
|
||||||
|
"MySampleConsumer");
|
||||||
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
|
||||||
|
LongDeserializer.class.getName());
|
||||||
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
|
||||||
|
StringDeserializer.class.getName());
|
||||||
|
// Create the consumer using props.
|
||||||
|
final Consumer<Long, String> consumer = new KafkaConsumer<Long, String>(props);
|
||||||
|
// Subscribe to the topic.
|
||||||
|
consumer.subscribe(Arrays.asList("samples"));
|
||||||
|
return consumer;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
2
pom.xml
2
pom.xml
|
@ -820,6 +820,7 @@
|
||||||
<module>antlr</module>
|
<module>antlr</module>
|
||||||
<module>apache-kafka</module>
|
<module>apache-kafka</module>
|
||||||
<module>apache-kafka-2</module>
|
<module>apache-kafka-2</module>
|
||||||
|
<module>apache-kafka-3</module>
|
||||||
<module>apache-olingo</module>
|
<module>apache-olingo</module>
|
||||||
|
|
||||||
<module>apache-poi-2</module>
|
<module>apache-poi-2</module>
|
||||||
|
@ -1089,6 +1090,7 @@
|
||||||
<module>antlr</module>
|
<module>antlr</module>
|
||||||
<module>apache-kafka</module>
|
<module>apache-kafka</module>
|
||||||
<module>apache-kafka-2</module>
|
<module>apache-kafka-2</module>
|
||||||
|
<module>apache-kafka-3</module>
|
||||||
<module>apache-olingo</module>
|
<module>apache-olingo</module>
|
||||||
|
|
||||||
<module>apache-poi-2</module>
|
<module>apache-poi-2</module>
|
||||||
|
|
Loading…
Reference in New Issue