BAEL-1426 Adding a message generator and updated Word Count Transactional App
This commit is contained in:
		
							parent
							
								
									5d7cc11745
								
							
						
					
					
						commit
						d9724fc153
					
				| @ -0,0 +1,56 @@ | ||||
| package com.baeldung.kafka; | ||||
| 
 | ||||
| import org.apache.kafka.clients.producer.KafkaProducer; | ||||
| import org.apache.kafka.clients.producer.ProducerRecord; | ||||
| import org.apache.kafka.common.KafkaException; | ||||
| 
 | ||||
| import java.util.Properties; | ||||
| import java.util.stream.Stream; | ||||
| 
 | ||||
| import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; | ||||
| import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG; | ||||
| import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; | ||||
| import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG; | ||||
| import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; | ||||
| 
 | ||||
| public class TransactionalMessageProducer { | ||||
| 
 | ||||
|     private static final String DATA_MESSAGE_1 = "Put any space separated data here for count"; | ||||
|     private static final String DATA_MESSAGE_2 = "Output will contain count of every word in the message"; | ||||
| 
 | ||||
|     public static void main(String[] args) { | ||||
| 
 | ||||
|         KafkaProducer<String, String> producer = createKafkaProducer(); | ||||
| 
 | ||||
|         producer.initTransactions(); | ||||
| 
 | ||||
|         try{ | ||||
| 
 | ||||
|             producer.beginTransaction(); | ||||
| 
 | ||||
|             Stream.of(DATA_MESSAGE_1, DATA_MESSAGE_2).forEach(s -> producer.send( | ||||
|                     new ProducerRecord<String, String>("input", null, s))); | ||||
| 
 | ||||
|             producer.commitTransaction(); | ||||
| 
 | ||||
|         }catch (KafkaException e){ | ||||
| 
 | ||||
|             producer.abortTransaction(); | ||||
| 
 | ||||
|         } | ||||
| 
 | ||||
|     } | ||||
| 
 | ||||
|     private static KafkaProducer<String, String> createKafkaProducer() { | ||||
| 
 | ||||
|         Properties props = new Properties(); | ||||
|         props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | ||||
|         props.put(ENABLE_IDEMPOTENCE_CONFIG, "true"); | ||||
|         props.put(TRANSACTIONAL_ID_CONFIG, "prod-0"); | ||||
|         props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); | ||||
|         props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); | ||||
| 
 | ||||
|         return new KafkaProducer(props); | ||||
| 
 | ||||
|     } | ||||
| } | ||||
| @ -14,6 +14,8 @@ import java.util.HashMap; | ||||
| import java.util.List; | ||||
| import java.util.Map; | ||||
| import java.util.Properties; | ||||
| import java.util.stream.Collectors; | ||||
| import java.util.stream.Stream; | ||||
| 
 | ||||
| import static java.time.Duration.ofSeconds; | ||||
| import static java.util.Collections.singleton; | ||||
| @ -21,16 +23,16 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.*; | ||||
| import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; | ||||
| import static org.apache.kafka.clients.producer.ProducerConfig.*; | ||||
| 
 | ||||
| public class TransactionalApp { | ||||
| public class TransactionalWordCount { | ||||
| 
 | ||||
|     private static final String CONSUMER_GROUP_ID = "test"; | ||||
|     private static final String CONSUMER_GROUP_ID = "my-group-id"; | ||||
|     private static final String OUTPUT_TOPIC = "output"; | ||||
|     private static final String INPUT_TOPIC = "input"; | ||||
| 
 | ||||
|     public static void main(String[] args) { | ||||
| 
 | ||||
|         KafkaConsumer<String, String> consumer = initConsumer(); | ||||
|         KafkaProducer<String, String> producer = initProducer(); | ||||
|         KafkaConsumer<String, String> consumer = createKafkaConsumer(); | ||||
|         KafkaProducer<String, String> producer = createKafkaProducer(); | ||||
| 
 | ||||
|         producer.initTransactions(); | ||||
| 
 | ||||
| @ -38,12 +40,17 @@ public class TransactionalApp { | ||||
| 
 | ||||
|             while (true) { | ||||
| 
 | ||||
|                 ConsumerRecords<String, String> records = consumer.poll(ofSeconds(20)); | ||||
|                 ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60)); | ||||
| 
 | ||||
|                 Map<String, Integer> wordCountMap = records.records(new TopicPartition(INPUT_TOPIC, 0)) | ||||
|                         .stream() | ||||
|                         .flatMap(record -> Stream.of(record.value().split(" "))) | ||||
|                         .map(word -> Tuple.of(word, 1)) | ||||
|                         .collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2)); | ||||
| 
 | ||||
|                 producer.beginTransaction(); | ||||
| 
 | ||||
|                 for (ConsumerRecord record : records) | ||||
|                     producer.send(new ProducerRecord(OUTPUT_TOPIC, record)); | ||||
|                 wordCountMap.forEach((key, value) -> producer.send(new ProducerRecord<String, String>(OUTPUT_TOPIC, key, value.toString()))); | ||||
| 
 | ||||
|                 Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(); | ||||
| 
 | ||||
| @ -51,7 +58,7 @@ public class TransactionalApp { | ||||
|                     List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition); | ||||
|                     long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset(); | ||||
| 
 | ||||
|                     offsetsToCommit.put(partition, new OffsetAndMetadata(offset)); | ||||
|                     offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1)); | ||||
|                 } | ||||
| 
 | ||||
|                 producer.sendOffsetsToTransaction(offsetsToCommit, CONSUMER_GROUP_ID); | ||||
| @ -68,11 +75,12 @@ public class TransactionalApp { | ||||
| 
 | ||||
|     } | ||||
| 
 | ||||
|     private static KafkaConsumer<String, String> initConsumer() { | ||||
|     private static KafkaConsumer<String, String> createKafkaConsumer() { | ||||
|         Properties props = new Properties(); | ||||
|         props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | ||||
|         props.put(GROUP_ID_CONFIG, CONSUMER_GROUP_ID); | ||||
|         props.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); | ||||
|         props.put(ISOLATION_LEVEL_CONFIG, "read_committed"); | ||||
|         props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); | ||||
|         props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); | ||||
| 
 | ||||
| @ -81,19 +89,14 @@ public class TransactionalApp { | ||||
|         return consumer; | ||||
|     } | ||||
| 
 | ||||
|     private static KafkaProducer<String, String> initProducer() { | ||||
|     private static KafkaProducer<String, String> createKafkaProducer() { | ||||
| 
 | ||||
|         Properties props = new Properties(); | ||||
|         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | ||||
|         props.put(ACKS_CONFIG, "all"); | ||||
|         props.put(RETRIES_CONFIG, 3); | ||||
|         props.put(BATCH_SIZE_CONFIG, 16384); | ||||
|         props.put(LINGER_MS_CONFIG, 1); | ||||
|         props.put(BUFFER_MEMORY_CONFIG, 33554432); | ||||
|         props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | ||||
|         props.put(ENABLE_IDEMPOTENCE_CONFIG, "true"); | ||||
|         props.put(TRANSACTIONAL_ID_CONFIG, "prod-1"); | ||||
|         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); | ||||
|         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); | ||||
|         props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); | ||||
|         props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); | ||||
| 
 | ||||
|         return new KafkaProducer(props); | ||||
| 
 | ||||
							
								
								
									
										24
									
								
								libraries/src/main/java/com/baeldung/kafka/Tuple.java
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										24
									
								
								libraries/src/main/java/com/baeldung/kafka/Tuple.java
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,24 @@ | ||||
| package com.baeldung.kafka; | ||||
| 
 | ||||
| public class Tuple { | ||||
| 
 | ||||
|     private String key; | ||||
|     private Integer value; | ||||
| 
 | ||||
|     private Tuple(String key, Integer value) { | ||||
|         this.key = key; | ||||
|         this.value = value; | ||||
|     } | ||||
| 
 | ||||
|     public static Tuple of(String key, Integer value){ | ||||
|         return new Tuple(key,value); | ||||
|     } | ||||
| 
 | ||||
|     public String getKey() { | ||||
|         return key; | ||||
|     } | ||||
| 
 | ||||
|     public Integer getValue() { | ||||
|         return value; | ||||
|     } | ||||
| } | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user