Merge pull request #5432 from smmansoor/bael-2040
[BAEL-2040] Introduction to apache pulsar
This commit is contained in:
commit
6e08650b9c
8
apache-pulsar/.gitignore
vendored
Executable file
8
apache-pulsar/.gitignore
vendored
Executable file
@ -0,0 +1,8 @@
|
|||||||
|
.classpath
|
||||||
|
.project
|
||||||
|
.settings
|
||||||
|
target
|
||||||
|
.idea
|
||||||
|
*.iml
|
||||||
|
.gradle/
|
||||||
|
build/
|
21
apache-pulsar/pom.xml
Normal file
21
apache-pulsar/pom.xml
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<groupId>com.baeldung.pulsar</groupId>
|
||||||
|
<artifactId>pulsar-java</artifactId>
|
||||||
|
<version>0.0.1</version>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.pulsar</groupId>
|
||||||
|
<artifactId>pulsar-client</artifactId>
|
||||||
|
<version>2.1.1-incubating</version>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
<properties>
|
||||||
|
<maven.compiler.target>1.8</maven.compiler.target>
|
||||||
|
<maven.compiler.source>1.8</maven.compiler.source>
|
||||||
|
</properties>
|
||||||
|
</project>
|
48
apache-pulsar/src/main/java/com/baeldung/ConsumerTest.java
Executable file
48
apache-pulsar/src/main/java/com/baeldung/ConsumerTest.java
Executable file
@ -0,0 +1,48 @@
|
|||||||
|
package com.baeldung;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.pulsar.client.api.Consumer;
|
||||||
|
import org.apache.pulsar.client.api.Message;
|
||||||
|
import org.apache.pulsar.client.api.PulsarClient;
|
||||||
|
import org.apache.pulsar.client.api.SubscriptionType;
|
||||||
|
|
||||||
|
public class ConsumerTest {
|
||||||
|
|
||||||
|
private static final String SERVICE_URL = "pulsar://localhost:6650";
|
||||||
|
private static final String TOPIC_NAME = "test-topic";
|
||||||
|
private static final String SUBSCRIPTION_NAME = "test-subscription";
|
||||||
|
|
||||||
|
public static void main(String[] args) throws IOException {
|
||||||
|
// Create a Pulsar client instance. A single instance can be shared across many
|
||||||
|
// producers and consumer within the same application
|
||||||
|
PulsarClient client = PulsarClient.builder()
|
||||||
|
.serviceUrl(SERVICE_URL)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
//Configure consumer specific settings.
|
||||||
|
Consumer<byte[]> consumer = client.newConsumer()
|
||||||
|
.topic(TOPIC_NAME)
|
||||||
|
// Allow multiple consumers to attach to the same subscription
|
||||||
|
// and get messages dispatched as a queue
|
||||||
|
.subscriptionType(SubscriptionType.Shared)
|
||||||
|
.subscriptionName(SUBSCRIPTION_NAME)
|
||||||
|
.subscribe();
|
||||||
|
|
||||||
|
|
||||||
|
// Once the consumer is created, it can be used for the entire application lifecycle
|
||||||
|
System.out.println("Created consumer for the topic "+ TOPIC_NAME);
|
||||||
|
|
||||||
|
do {
|
||||||
|
// Wait until a message is available
|
||||||
|
Message<byte[]> msg = consumer.receive();
|
||||||
|
|
||||||
|
// Extract the message as a printable string and then log
|
||||||
|
String content = new String(msg.getData());
|
||||||
|
System.out.println("Received message '"+content+"' with ID "+msg.getMessageId());
|
||||||
|
|
||||||
|
// Acknowledge processing of the message so that it can be deleted
|
||||||
|
consumer.acknowledge(msg);
|
||||||
|
} while (true);
|
||||||
|
}
|
||||||
|
}
|
58
apache-pulsar/src/main/java/com/baeldung/ProducerTest.java
Executable file
58
apache-pulsar/src/main/java/com/baeldung/ProducerTest.java
Executable file
@ -0,0 +1,58 @@
|
|||||||
|
package com.baeldung;
|
||||||
|
|
||||||
|
import org.apache.pulsar.client.api.CompressionType;
|
||||||
|
import org.apache.pulsar.client.api.Message;
|
||||||
|
import org.apache.pulsar.client.api.MessageBuilder;
|
||||||
|
import org.apache.pulsar.client.api.MessageId;
|
||||||
|
import org.apache.pulsar.client.api.Producer;
|
||||||
|
import org.apache.pulsar.client.api.PulsarClient;
|
||||||
|
import org.apache.pulsar.client.api.PulsarClientException;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
public class ProducerTest {
|
||||||
|
|
||||||
|
private static final String SERVICE_URL = "pulsar://localhost:6650";
|
||||||
|
private static final String TOPIC_NAME = "test-topic";
|
||||||
|
|
||||||
|
public static void main(String[] args) throws IOException {
|
||||||
|
// Create a Pulsar client instance. A single instance can be shared across many
|
||||||
|
// producers and consumer within the same application
|
||||||
|
PulsarClient client = PulsarClient.builder()
|
||||||
|
.serviceUrl(SERVICE_URL)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// Configure producer specific settings
|
||||||
|
Producer<byte[]> producer = client.newProducer()
|
||||||
|
// Set the topic
|
||||||
|
.topic(TOPIC_NAME)
|
||||||
|
// Enable compression
|
||||||
|
.compressionType(CompressionType.LZ4)
|
||||||
|
.create();
|
||||||
|
|
||||||
|
// Once the producer is created, it can be used for the entire application life-cycle
|
||||||
|
System.out.println("Created producer for the topic "+TOPIC_NAME);
|
||||||
|
|
||||||
|
// Send 5 test messages
|
||||||
|
IntStream.range(1, 5).forEach(i -> {
|
||||||
|
String content = String.format("hi-pulsar-%d", i);
|
||||||
|
|
||||||
|
// Build a message object
|
||||||
|
Message<byte[]> msg = MessageBuilder.create()
|
||||||
|
.setContent(content.getBytes())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// Send each message and log message content and ID when successfully received
|
||||||
|
try {
|
||||||
|
MessageId msgId = producer.send(msg);
|
||||||
|
|
||||||
|
System.out.println("Published message '"+content+"' with the ID "+msgId);
|
||||||
|
} catch (PulsarClientException e) {
|
||||||
|
System.out.println(e.getMessage());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,59 @@
|
|||||||
|
package com.baeldung.subscriptions;
|
||||||
|
|
||||||
|
import org.apache.pulsar.client.api.ConsumerBuilder;
|
||||||
|
import org.apache.pulsar.client.api.Message;
|
||||||
|
import org.apache.pulsar.client.api.MessageBuilder;
|
||||||
|
import org.apache.pulsar.client.api.Producer;
|
||||||
|
import org.apache.pulsar.client.api.PulsarClient;
|
||||||
|
import org.apache.pulsar.client.api.PulsarClientException;
|
||||||
|
import org.apache.pulsar.client.api.SubscriptionType;
|
||||||
|
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
public class ExclusiveSubscriptionTest {
|
||||||
|
private static final String SERVICE_URL = "pulsar://localhost:6650";
|
||||||
|
private static final String TOPIC_NAME = "test-topic";
|
||||||
|
private static final String SUBSCRIPTION_NAME = "test-subscription";
|
||||||
|
private static final SubscriptionType SUBSCRIPTION_TYPE = SubscriptionType.Exclusive;
|
||||||
|
|
||||||
|
public static void main(String[] args) throws PulsarClientException {
|
||||||
|
PulsarClient client = PulsarClient.builder()
|
||||||
|
.serviceUrl(SERVICE_URL)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Producer<byte[]> producer = client.newProducer()
|
||||||
|
.topic(TOPIC_NAME)
|
||||||
|
.create();
|
||||||
|
|
||||||
|
ConsumerBuilder<byte[]> consumer1 = client.newConsumer()
|
||||||
|
.topic(TOPIC_NAME)
|
||||||
|
.subscriptionName(SUBSCRIPTION_NAME)
|
||||||
|
.subscriptionType(SUBSCRIPTION_TYPE);
|
||||||
|
|
||||||
|
ConsumerBuilder<byte[]> consumer2 = client.newConsumer()
|
||||||
|
.topic(TOPIC_NAME)
|
||||||
|
.subscriptionName(SUBSCRIPTION_NAME)
|
||||||
|
.subscriptionType(SUBSCRIPTION_TYPE);
|
||||||
|
|
||||||
|
IntStream.range(0, 999).forEach(i -> {
|
||||||
|
Message<byte[]> msg = MessageBuilder.create()
|
||||||
|
.setContent(String.format("message-%d", i).getBytes())
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
producer.send(msg);
|
||||||
|
} catch (PulsarClientException e) {
|
||||||
|
System.out.println(e.getMessage());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Consumer 1 can subscribe to the topic
|
||||||
|
consumer1.subscribe();
|
||||||
|
|
||||||
|
// Consumer 2 cannot due to the exclusive subscription held by consumer 1
|
||||||
|
consumer2.subscribeAsync()
|
||||||
|
.handle((consumer, exception) -> {
|
||||||
|
System.out.println(exception.getMessage());
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,76 @@
|
|||||||
|
package com.baeldung.subscriptions;
|
||||||
|
|
||||||
|
import org.apache.pulsar.client.api.Consumer;
|
||||||
|
import org.apache.pulsar.client.api.ConsumerBuilder;
|
||||||
|
import org.apache.pulsar.client.api.Message;
|
||||||
|
import org.apache.pulsar.client.api.MessageBuilder;
|
||||||
|
import org.apache.pulsar.client.api.Producer;
|
||||||
|
import org.apache.pulsar.client.api.PulsarClient;
|
||||||
|
import org.apache.pulsar.client.api.PulsarClientException;
|
||||||
|
import org.apache.pulsar.client.api.SubscriptionType;
|
||||||
|
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
public class FailoverSubscriptionTest {
|
||||||
|
private static final String SERVICE_URL = "pulsar://localhost:6650";
|
||||||
|
private static final String TOPIC_NAME = "failover-subscription-test-topic";
|
||||||
|
private static final String SUBSCRIPTION_NAME = "test-subscription";
|
||||||
|
private static final SubscriptionType SUBSCRIPTION_TYPE = SubscriptionType.Failover;
|
||||||
|
private static final int NUM_MSGS = 10;
|
||||||
|
|
||||||
|
public static void main(String[] args) throws PulsarClientException {
|
||||||
|
PulsarClient client = PulsarClient.builder()
|
||||||
|
.serviceUrl(SERVICE_URL)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Producer<byte[]> producer = client.newProducer()
|
||||||
|
.topic(TOPIC_NAME)
|
||||||
|
.create();
|
||||||
|
|
||||||
|
ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer()
|
||||||
|
.topic(TOPIC_NAME)
|
||||||
|
.subscriptionName(SUBSCRIPTION_NAME)
|
||||||
|
.subscriptionType(SUBSCRIPTION_TYPE);
|
||||||
|
|
||||||
|
Consumer<byte[]> mainConsumer = consumerBuilder
|
||||||
|
.consumerName("consumer-a")
|
||||||
|
.messageListener((consumer, msg) -> {
|
||||||
|
System.out.println("Message received by main consumer");
|
||||||
|
|
||||||
|
try {
|
||||||
|
consumer.acknowledge(msg);
|
||||||
|
} catch (PulsarClientException e) {
|
||||||
|
System.out.println(e.getMessage());
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.subscribe();
|
||||||
|
|
||||||
|
Consumer<byte[]> failoverConsumer = consumerBuilder
|
||||||
|
.consumerName("consumer-b")
|
||||||
|
.messageListener((consumer, msg) -> {
|
||||||
|
System.out.println("Message received by failover consumer");
|
||||||
|
|
||||||
|
try {
|
||||||
|
consumer.acknowledge(msg);
|
||||||
|
} catch (PulsarClientException e) {
|
||||||
|
System.out.println(e.getMessage());
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.subscribe();
|
||||||
|
|
||||||
|
IntStream.range(0, NUM_MSGS).forEach(i -> {
|
||||||
|
Message<byte[]> msg = MessageBuilder.create()
|
||||||
|
.setContent(String.format("message-%d", i).getBytes())
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
producer.send(msg);
|
||||||
|
|
||||||
|
Thread.sleep(100);
|
||||||
|
|
||||||
|
if (i > 5) mainConsumer.close();
|
||||||
|
} catch (InterruptedException | PulsarClientException e) {
|
||||||
|
System.out.println(e.getMessage());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user