Spring Boot with Apache Pulsar (#14341)

* Add dead-letter configuration with code refactor

* Add String consumer

* Add customizers in message producers

* Reindent PulsarConsumer.java

* Reindent PulsarProducer.java

* Reindent pom.xml and remove readme

* Update log in PulsarConsumer

* Re-indent main class and remove test directory
This commit is contained in:
neha298 2023-08-03 00:51:15 +05:30 committed by GitHub
parent 7091862cb2
commit de5e3f519d
6 changed files with 202 additions and 0 deletions

48
spring-pulsar/pom.xml Normal file
View File

@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-pulsar</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-pulsar</name>
<description>Intro to Apache Pulsar with Spring</description>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-boot-3</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../parent-boot-3</relativePath>
</parent>
<properties>
<java.version>17</java.version>
<spring-pulsar-spring-boot-starter>0.2.0</spring-pulsar-spring-boot-starter>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.pulsar</groupId>
<artifactId>spring-pulsar-spring-boot-starter</artifactId>
<version>0.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,59 @@
package com.baeldung.springpulsar;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.pulsar.listener.AckMode;
import org.springframework.stereotype.Service;
@Service
public class PulsarConsumer {
private static final String STRING_TOPIC = "string-topic";
private static final String USER_TOPIC = "user-topic";
private static final String USER_DEAD_LETTER_TOPIC = "user-dead-letter-topic";
private final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumer.class);
@PulsarListener(
subscriptionName = "string-topic-subscription",
topics = STRING_TOPIC,
subscriptionType = SubscriptionType.Shared
)
public void stringTopicListener(String str) {
LOGGER.info("Received String message: {}", str);
}
@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder()
.maxRedeliverCount(10)
.deadLetterTopic(USER_DEAD_LETTER_TOPIC)
.build();
}
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
subscriptionType = SubscriptionType.Shared,
schemaType = SchemaType.JSON,
ackMode = AckMode.RECORD,
deadLetterPolicy = "deadLetterPolicy",
properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}
@PulsarListener(
subscriptionName = "dead-letter-topic-subscription",
topics = USER_DEAD_LETTER_TOPIC,
subscriptionType = SubscriptionType.Shared
)
public void userDlqTopicListener(User user) {
LOGGER.info("Received user object in user-DLQ with email: {}", user.getEmail());
}
}

View File

@ -0,0 +1,37 @@
package com.baeldung.springpulsar;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class PulsarProducer {
@Autowired
private PulsarTemplate<User> template;
@Autowired
private PulsarTemplate<String> stringTemplate;
private static final String USER_TOPIC = "user-topic";
private static final String USER_TOPIC_STR = "string-topic";
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.newMessage(user)
.withProducerCustomizer(pc -> {
pc.accessMode(ProducerAccessMode.Shared);
})
.withMessageCustomizer(mc -> {
mc.deliverAfter(10L, TimeUnit.SECONDS);
})
.withTopic(USER_TOPIC)
.send();
}
public void sendStringMessageToPulsarTopic(String str) throws PulsarClientException {
stringTemplate.send(USER_TOPIC_STR, str);
}
}

View File

@ -0,0 +1,15 @@
package com.baeldung.springpulsar;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.pulsar.annotation.EnablePulsar;
@EnablePulsar
@SpringBootApplication
public class SpringPulsarApplication {
public static void main(String[] args) {
SpringApplication.run(SpringPulsarApplication.class, args);
}
}

View File

@ -0,0 +1,31 @@
package com.baeldung.springpulsar;
public class User {
private String email;
private String firstName;
public User() {
}
public User(String email, String firstName) {
this.email = email;
this.firstName = firstName;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
}

View File

@ -0,0 +1,12 @@
server:
port: 8085
spring:
pulsar:
client:
service-url: pulsar://localhost:6650
defaults:
type-mappings:
- message-type: com.baeldung.springpulsar.User
schema-info:
schema-type: JSON