BAEL-3185: Fixed formatting
This commit is contained in:
parent
16b8a61b16
commit
002b937923
|
@ -25,7 +25,7 @@ public class KafkaApplication {
|
|||
public static void main(String[] args) throws Exception {
|
||||
|
||||
ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);
|
||||
|
||||
|
||||
MessageProducer producer = context.getBean(MessageProducer.class);
|
||||
MessageListener listener = context.getBean(MessageListener.class);
|
||||
/*
|
||||
|
@ -101,15 +101,17 @@ public class KafkaApplication {
|
|||
private String greetingTopicName;
|
||||
|
||||
public void sendMessage(String message) {
|
||||
|
||||
|
||||
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
|
||||
|
||||
|
||||
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
|
||||
|
||||
@Override
|
||||
public void onSuccess(SendResult<String, String> result) {
|
||||
System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]");
|
||||
System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata()
|
||||
.offset() + "]");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable ex) {
|
||||
System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage());
|
||||
|
|
|
@ -29,7 +29,7 @@ public class KafkaConsumerConfig {
|
|||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
return new DefaultKafkaConsumerFactory<>(props);
|
||||
}
|
||||
|
||||
|
||||
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
|
||||
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
||||
factory.setConsumerFactory(consumerFactory(groupId));
|
||||
|
@ -50,12 +50,12 @@ public class KafkaConsumerConfig {
|
|||
public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() {
|
||||
return kafkaListenerContainerFactory("headers");
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() {
|
||||
return kafkaListenerContainerFactory("partitions");
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {
|
||||
ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory("filter");
|
||||
|
|
|
@ -32,7 +32,7 @@ public class KafkaProducerConfig {
|
|||
public KafkaTemplate<String, String> kafkaTemplate() {
|
||||
return new KafkaTemplate<>(producerFactory());
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public ProducerFactory<String, Greeting> greetingProducerFactory() {
|
||||
Map<String, Object> configProps = new HashMap<>();
|
||||
|
@ -41,10 +41,10 @@ public class KafkaProducerConfig {
|
|||
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
|
||||
return new DefaultKafkaProducerFactory<>(configProps);
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
|
||||
return new KafkaTemplate<>(greetingProducerFactory());
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -12,10 +12,10 @@ import org.springframework.kafka.core.KafkaAdmin;
|
|||
|
||||
@Configuration
|
||||
public class KafkaTopicConfig {
|
||||
|
||||
|
||||
@Value(value = "${kafka.bootstrapAddress}")
|
||||
private String bootstrapAddress;
|
||||
|
||||
|
||||
@Value(value = "${message.topic.name}")
|
||||
private String topicName;
|
||||
|
||||
|
@ -27,31 +27,31 @@ public class KafkaTopicConfig {
|
|||
|
||||
@Value(value = "${greeting.topic.name}")
|
||||
private String greetingTopicName;
|
||||
|
||||
|
||||
@Bean
|
||||
public KafkaAdmin kafkaAdmin() {
|
||||
Map<String, Object> configs = new HashMap<>();
|
||||
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||
return new KafkaAdmin(configs);
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public NewTopic topic1() {
|
||||
return new NewTopic(topicName, 1, (short) 1);
|
||||
return new NewTopic(topicName, 1, (short) 1);
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public NewTopic topic2() {
|
||||
return new NewTopic(partionedTopicName, 6, (short) 1);
|
||||
return new NewTopic(partionedTopicName, 6, (short) 1);
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public NewTopic topic3() {
|
||||
return new NewTopic(filteredTopicName, 1, (short) 1);
|
||||
return new NewTopic(filteredTopicName, 1, (short) 1);
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public NewTopic topic4() {
|
||||
return new NewTopic(greetingTopicName, 1, (short) 1);
|
||||
return new NewTopic(greetingTopicName, 1, (short) 1);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue