Config changes
This commit is contained in:
parent
feca50daca
commit
88f85963bb
@ -16,7 +16,7 @@ import java.util.Properties;
|
|||||||
public class ConsumerConfigurations {
|
public class ConsumerConfigurations {
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
Properties props = new Properties();
|
Properties props = new Properties();
|
||||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.KAFKA_LOCAL);
|
||||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
|
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
|
||||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
||||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
||||||
@ -24,7 +24,7 @@ public class ConsumerConfigurations {
|
|||||||
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
|
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
|
||||||
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
|
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
|
||||||
Consumer<String, String> consumer = new KafkaConsumer<>(props);
|
Consumer<String, String> consumer = new KafkaConsumer<>(props);
|
||||||
consumer.subscribe(Collections.singletonList("multi_partition_topic"));
|
consumer.subscribe(Collections.singletonList(Config.MULTI_PARTITION_TOPIC));
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
|
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
|
||||||
|
@ -16,14 +16,15 @@ public class ProducerConfigurations {
|
|||||||
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
|
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
|
||||||
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
|
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
|
||||||
props.put(ProducerConfig.LINGER_MS_CONFIG, "5");
|
props.put(ProducerConfig.LINGER_MS_CONFIG, "5");
|
||||||
|
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
|
||||||
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
|
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
|
||||||
|
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
String key = "Key-" + (i % 3); // Assuming 3 partitions
|
String key = "Key-" + (i % 5); // Assuming 5 partitions
|
||||||
producer.send(new ProducerRecord<>("multi_partition_topic", key, "Message-" + i));
|
producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, key, "Message-" + i));
|
||||||
}
|
}
|
||||||
|
|
||||||
producer.close();
|
producer.close();
|
||||||
System.out.println("MultiPartitionProducer Completed.");
|
System.out.println("Producer Configurations Completed.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user