BAEL-7374 - Incorporated Review comments for PR

This commit is contained in:
Amol Gote 2024-02-17 09:15:50 -05:00
parent 36f3e32ff6
commit ec88f94427
7 changed files with 15 additions and 20 deletions

View File

@ -1,8 +1,8 @@
package com.baeldung.spring.kafka.start.stop.consumer; package com.baeldung.spring.kafka.start.stop.consumer;
public class Constants { public class Constants {
public static final String MULTI_PARTITION_TOPIC = "multi_partition_topic"; public static final String MULTI_PARTITION_TOPIC = "multi_partition_topic";
public static final int MULTIPLE_PARTITIONS = 5;
public static final short REPLICATION_FACTOR = 1;
public static final String LISTENER_ID = "listener-id-1"; public static final String LISTENER_ID = "listener-id-1";
} }

View File

@ -11,7 +11,6 @@ public class KafkaListenerControlService {
@Autowired @Autowired
private KafkaListenerEndpointRegistry registry; private KafkaListenerEndpointRegistry registry;
// Method to start a listener
public void startListener(String listenerId) { public void startListener(String listenerId) {
MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId); MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
if (listenerContainer != null && !listenerContainer.isRunning()) { if (listenerContainer != null && !listenerContainer.isRunning()) {
@ -19,7 +18,6 @@ public class KafkaListenerControlService {
} }
} }
// Method to stop a listener
public void stopListener(String listenerId) { public void stopListener(String listenerId) {
MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId); MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
if (listenerContainer != null && listenerContainer.isRunning()) { if (listenerContainer != null && listenerContainer.isRunning()) {

View File

@ -1,4 +1,5 @@
package com.baeldung.spring.kafka.start.stop.consumer; package com.baeldung.spring.kafka.start.stop.consumer;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;

View File

@ -1,10 +1,12 @@
package com.baeldung.spring.kafka.start.stop.consumer; package com.baeldung.spring.kafka.start.stop.consumer;
public class UserEvent { public class UserEvent {
private String userEventId;
private long eventNanoTime;
public UserEvent(){} private String userEventId;
public UserEvent() {
}
public UserEvent(String userEventId) { public UserEvent(String userEventId) {
this.userEventId = userEventId; this.userEventId = userEventId;
@ -18,11 +20,4 @@ public class UserEvent {
this.userEventId = userEventId; this.userEventId = userEventId;
} }
public long getEventNanoTime() {
return eventNanoTime;
}
public void setEventNanoTime(long eventNanoTime) {
this.eventNanoTime = eventNanoTime;
}
} }

View File

@ -1,4 +1,5 @@
package com.baeldung.spring.kafka.start.stop.consumer; package com.baeldung.spring.kafka.start.stop.consumer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -7,6 +8,7 @@ import org.springframework.stereotype.Component;
@Component @Component
public class UserEventListener { public class UserEventListener {
private static final Logger logger = LoggerFactory.getLogger(UserEventListener.class); private static final Logger logger = LoggerFactory.getLogger(UserEventListener.class);
@Autowired @Autowired
@ -15,7 +17,7 @@ public class UserEventListener {
@KafkaListener(id = Constants.LISTENER_ID, topics = Constants.MULTI_PARTITION_TOPIC, groupId = "test-group", @KafkaListener(id = Constants.LISTENER_ID, topics = Constants.MULTI_PARTITION_TOPIC, groupId = "test-group",
containerFactory = "kafkaListenerContainerFactory", autoStartup = "false") containerFactory = "kafkaListenerContainerFactory", autoStartup = "false")
public void userEventListener(UserEvent userEvent) { public void userEventListener(UserEvent userEvent) {
logger.info("Received UserEvent: " + userEvent.getUserEventId() + ", Time: " + userEvent.getEventNanoTime()); logger.info("Received UserEvent: " + userEvent.getUserEventId());
userEventStore.addUserEvent(userEvent); userEventStore.addUserEvent(userEvent);
} }
} }

View File

@ -7,17 +7,18 @@ import java.util.List;
@Component @Component
public class UserEventStore { public class UserEventStore {
private final List<UserEvent> userEvents = new ArrayList<>(); private final List<UserEvent> userEvents = new ArrayList<>();
public void addUserEvent(UserEvent userEvent){ public void addUserEvent(UserEvent userEvent) {
userEvents.add(userEvent); userEvents.add(userEvent);
} }
public List<UserEvent> getUserEvents(){ public List<UserEvent> getUserEvents() {
return userEvents; return userEvents;
} }
public void clearUserEvents(){ public void clearUserEvents() {
this.userEvents.clear(); this.userEvents.clear();
} }
} }

View File

@ -83,14 +83,12 @@ public class StartStopConsumerLiveTest {
//Verification that listener has started. //Verification that listener has started.
UserEvent startUserEventTest = new UserEvent(UUID.randomUUID().toString()); UserEvent startUserEventTest = new UserEvent(UUID.randomUUID().toString());
startUserEventTest.setEventNanoTime(System.nanoTime());
producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, startUserEventTest)); producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, startUserEventTest));
await().untilAsserted(() -> assertEquals(1, this.userEventStore.getUserEvents().size())); await().untilAsserted(() -> assertEquals(1, this.userEventStore.getUserEvents().size()));
this.userEventStore.clearUserEvents(); this.userEventStore.clearUserEvents();
for (long count = 1; count <= 10; count++) { for (long count = 1; count <= 10; count++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString()); UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setEventNanoTime(System.nanoTime());
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, userEvent)); Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, userEvent));
RecordMetadata metadata = future.get(); RecordMetadata metadata = future.get();
if (count == 4) { if (count == 4) {