BAEL-4541: Fix typos in the spring-kafka module (#9857)
This commit is contained in:
parent
cd34bc8454
commit
6ce7fee3ff
|
@ -30,23 +30,23 @@ public class KafkaApplication {
|
||||||
MessageListener listener = context.getBean(MessageListener.class);
|
MessageListener listener = context.getBean(MessageListener.class);
|
||||||
/*
|
/*
|
||||||
* Sending a Hello World message to topic 'baeldung'.
|
* Sending a Hello World message to topic 'baeldung'.
|
||||||
* Must be recieved by both listeners with group foo
|
* Must be received by both listeners with group foo
|
||||||
* and bar with containerFactory fooKafkaListenerContainerFactory
|
* and bar with containerFactory fooKafkaListenerContainerFactory
|
||||||
* and barKafkaListenerContainerFactory respectively.
|
* and barKafkaListenerContainerFactory respectively.
|
||||||
* It will also be recieved by the listener with
|
* It will also be received by the listener with
|
||||||
* headersKafkaListenerContainerFactory as container factory
|
* headersKafkaListenerContainerFactory as container factory.
|
||||||
*/
|
*/
|
||||||
producer.sendMessage("Hello, World!");
|
producer.sendMessage("Hello, World!");
|
||||||
listener.latch.await(10, TimeUnit.SECONDS);
|
listener.latch.await(10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Sending message to a topic with 5 partition,
|
* Sending message to a topic with 5 partitions,
|
||||||
* each message to a different partition. But as per
|
* each message to a different partition. But as per
|
||||||
* listener configuration, only the messages from
|
* listener configuration, only the messages from
|
||||||
* partition 0 and 3 will be consumed.
|
* partition 0 and 3 will be consumed.
|
||||||
*/
|
*/
|
||||||
for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < 5; i++) {
|
||||||
producer.sendMessageToPartion("Hello To Partioned Topic!", i);
|
producer.sendMessageToPartition("Hello To Partitioned Topic!", i);
|
||||||
}
|
}
|
||||||
listener.partitionLatch.await(10, TimeUnit.SECONDS);
|
listener.partitionLatch.await(10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
@ -61,7 +61,7 @@ public class KafkaApplication {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Sending message to 'greeting' topic. This will send
|
* Sending message to 'greeting' topic. This will send
|
||||||
* and recieved a java object with the help of
|
* and received a java object with the help of
|
||||||
* greetingKafkaListenerContainerFactory.
|
* greetingKafkaListenerContainerFactory.
|
||||||
*/
|
*/
|
||||||
producer.sendGreetingMessage(new Greeting("Greetings", "World!"));
|
producer.sendGreetingMessage(new Greeting("Greetings", "World!"));
|
||||||
|
@ -92,7 +92,7 @@ public class KafkaApplication {
|
||||||
private String topicName;
|
private String topicName;
|
||||||
|
|
||||||
@Value(value = "${partitioned.topic.name}")
|
@Value(value = "${partitioned.topic.name}")
|
||||||
private String partionedTopicName;
|
private String partitionedTopicName;
|
||||||
|
|
||||||
@Value(value = "${filtered.topic.name}")
|
@Value(value = "${filtered.topic.name}")
|
||||||
private String filteredTopicName;
|
private String filteredTopicName;
|
||||||
|
@ -119,8 +119,8 @@ public class KafkaApplication {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendMessageToPartion(String message, int partition) {
|
public void sendMessageToPartition(String message, int partition) {
|
||||||
kafkaTemplate.send(partionedTopicName, partition, null, message);
|
kafkaTemplate.send(partitionedTopicName, partition, null, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendMessageToFiltered(String message) {
|
public void sendMessageToFiltered(String message) {
|
||||||
|
@ -144,37 +144,37 @@ public class KafkaApplication {
|
||||||
|
|
||||||
@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
|
@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
|
||||||
public void listenGroupFoo(String message) {
|
public void listenGroupFoo(String message) {
|
||||||
System.out.println("Received Messasge in group 'foo': " + message);
|
System.out.println("Received Message in group 'foo': " + message);
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@KafkaListener(topics = "${message.topic.name}", groupId = "bar", containerFactory = "barKafkaListenerContainerFactory")
|
@KafkaListener(topics = "${message.topic.name}", groupId = "bar", containerFactory = "barKafkaListenerContainerFactory")
|
||||||
public void listenGroupBar(String message) {
|
public void listenGroupBar(String message) {
|
||||||
System.out.println("Received Messasge in group 'bar': " + message);
|
System.out.println("Received Message in group 'bar': " + message);
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@KafkaListener(topics = "${message.topic.name}", containerFactory = "headersKafkaListenerContainerFactory")
|
@KafkaListener(topics = "${message.topic.name}", containerFactory = "headersKafkaListenerContainerFactory")
|
||||||
public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
|
public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
|
||||||
System.out.println("Received Messasge: " + message + " from partition: " + partition);
|
System.out.println("Received Message: " + message + " from partition: " + partition);
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@KafkaListener(topicPartitions = @TopicPartition(topic = "${partitioned.topic.name}", partitions = { "0", "3" }), containerFactory = "partitionsKafkaListenerContainerFactory")
|
@KafkaListener(topicPartitions = @TopicPartition(topic = "${partitioned.topic.name}", partitions = { "0", "3" }), containerFactory = "partitionsKafkaListenerContainerFactory")
|
||||||
public void listenToParition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
|
public void listenToPartition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
|
||||||
System.out.println("Received Message: " + message + " from partition: " + partition);
|
System.out.println("Received Message: " + message + " from partition: " + partition);
|
||||||
this.partitionLatch.countDown();
|
this.partitionLatch.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@KafkaListener(topics = "${filtered.topic.name}", containerFactory = "filterKafkaListenerContainerFactory")
|
@KafkaListener(topics = "${filtered.topic.name}", containerFactory = "filterKafkaListenerContainerFactory")
|
||||||
public void listenWithFilter(String message) {
|
public void listenWithFilter(String message) {
|
||||||
System.out.println("Recieved Message in filtered listener: " + message);
|
System.out.println("Received Message in filtered listener: " + message);
|
||||||
this.filterLatch.countDown();
|
this.filterLatch.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@KafkaListener(topics = "${greeting.topic.name}", containerFactory = "greetingKafkaListenerContainerFactory")
|
@KafkaListener(topics = "${greeting.topic.name}", containerFactory = "greetingKafkaListenerContainerFactory")
|
||||||
public void greetingListener(Greeting greeting) {
|
public void greetingListener(Greeting greeting) {
|
||||||
System.out.println("Recieved greeting message: " + greeting);
|
System.out.println("Received greeting message: " + greeting);
|
||||||
this.greetingLatch.countDown();
|
this.greetingLatch.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ public class KafkaTopicConfig {
|
||||||
private String topicName;
|
private String topicName;
|
||||||
|
|
||||||
@Value(value = "${partitioned.topic.name}")
|
@Value(value = "${partitioned.topic.name}")
|
||||||
private String partionedTopicName;
|
private String partitionedTopicName;
|
||||||
|
|
||||||
@Value(value = "${filtered.topic.name}")
|
@Value(value = "${filtered.topic.name}")
|
||||||
private String filteredTopicName;
|
private String filteredTopicName;
|
||||||
|
@ -42,7 +42,7 @@ public class KafkaTopicConfig {
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public NewTopic topic2() {
|
public NewTopic topic2() {
|
||||||
return new NewTopic(partionedTopicName, 6, (short) 1);
|
return new NewTopic(partitionedTopicName, 6, (short) 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
|
Loading…
Reference in New Issue