JAVA-1848: Moved apache-pulsar to apache-libraries
This commit is contained in:
parent
d06218cc14
commit
17efcce194
|
@ -1,8 +0,0 @@
|
||||||
.classpath
|
|
||||||
.project
|
|
||||||
.settings
|
|
||||||
target
|
|
||||||
.idea
|
|
||||||
*.iml
|
|
||||||
.gradle/
|
|
||||||
build/
|
|
|
@ -1,7 +0,0 @@
|
||||||
## Apache Pulsar
|
|
||||||
|
|
||||||
This module contains articles about Apache Pulsar
|
|
||||||
|
|
||||||
### Relevant Articles:
|
|
||||||
|
|
||||||
- [Introduction to Apache Pulsar](https://www.baeldung.com/apache-pulsar)
|
|
|
@ -1,32 +0,0 @@
|
||||||
<?xml version="1.0" encoding="UTF-8"?>
|
|
||||||
<project
|
|
||||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
|
|
||||||
xmlns="http://maven.apache.org/POM/4.0.0"
|
|
||||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
|
||||||
<modelVersion>4.0.0</modelVersion>
|
|
||||||
<groupId>com.baeldung.pulsar</groupId>
|
|
||||||
<artifactId>apache-pulsar</artifactId>
|
|
||||||
<version>0.0.1</version>
|
|
||||||
<name>apache-pulsar</name>
|
|
||||||
|
|
||||||
<parent>
|
|
||||||
<groupId>com.baeldung</groupId>
|
|
||||||
<artifactId>parent-modules</artifactId>
|
|
||||||
<version>1.0.0-SNAPSHOT</version>
|
|
||||||
<relativePath>..</relativePath>
|
|
||||||
</parent>
|
|
||||||
|
|
||||||
<dependencies>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.pulsar</groupId>
|
|
||||||
<artifactId>pulsar-client</artifactId>
|
|
||||||
<version>${pulsar-client.version}</version>
|
|
||||||
<scope>compile</scope>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
|
||||||
|
|
||||||
<properties>
|
|
||||||
<pulsar-client.version>2.1.1-incubating</pulsar-client.version>
|
|
||||||
</properties>
|
|
||||||
|
|
||||||
</project>
|
|
|
@ -1,48 +0,0 @@
|
||||||
package com.baeldung;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.pulsar.client.api.Consumer;
|
|
||||||
import org.apache.pulsar.client.api.Message;
|
|
||||||
import org.apache.pulsar.client.api.PulsarClient;
|
|
||||||
import org.apache.pulsar.client.api.SubscriptionType;
|
|
||||||
|
|
||||||
public class ConsumerUnitTest {
|
|
||||||
|
|
||||||
private static final String SERVICE_URL = "pulsar://localhost:6650";
|
|
||||||
private static final String TOPIC_NAME = "test-topic";
|
|
||||||
private static final String SUBSCRIPTION_NAME = "test-subscription";
|
|
||||||
|
|
||||||
public static void main(String[] args) throws IOException {
|
|
||||||
// Create a Pulsar client instance. A single instance can be shared across many
|
|
||||||
// producers and consumer within the same application
|
|
||||||
PulsarClient client = PulsarClient.builder()
|
|
||||||
.serviceUrl(SERVICE_URL)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
//Configure consumer specific settings.
|
|
||||||
Consumer<byte[]> consumer = client.newConsumer()
|
|
||||||
.topic(TOPIC_NAME)
|
|
||||||
// Allow multiple consumers to attach to the same subscription
|
|
||||||
// and get messages dispatched as a queue
|
|
||||||
.subscriptionType(SubscriptionType.Shared)
|
|
||||||
.subscriptionName(SUBSCRIPTION_NAME)
|
|
||||||
.subscribe();
|
|
||||||
|
|
||||||
|
|
||||||
// Once the consumer is created, it can be used for the entire application lifecycle
|
|
||||||
System.out.println("Created consumer for the topic "+ TOPIC_NAME);
|
|
||||||
|
|
||||||
do {
|
|
||||||
// Wait until a message is available
|
|
||||||
Message<byte[]> msg = consumer.receive();
|
|
||||||
|
|
||||||
// Extract the message as a printable string and then log
|
|
||||||
String content = new String(msg.getData());
|
|
||||||
System.out.println("Received message '"+content+"' with ID "+msg.getMessageId());
|
|
||||||
|
|
||||||
// Acknowledge processing of the message so that it can be deleted
|
|
||||||
consumer.acknowledge(msg);
|
|
||||||
} while (true);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,58 +0,0 @@
|
||||||
package com.baeldung;
|
|
||||||
|
|
||||||
import org.apache.pulsar.client.api.CompressionType;
|
|
||||||
import org.apache.pulsar.client.api.Message;
|
|
||||||
import org.apache.pulsar.client.api.MessageBuilder;
|
|
||||||
import org.apache.pulsar.client.api.MessageId;
|
|
||||||
import org.apache.pulsar.client.api.Producer;
|
|
||||||
import org.apache.pulsar.client.api.PulsarClient;
|
|
||||||
import org.apache.pulsar.client.api.PulsarClientException;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.stream.IntStream;
|
|
||||||
|
|
||||||
public class ProducerUnitTest {
|
|
||||||
|
|
||||||
private static final String SERVICE_URL = "pulsar://localhost:6650";
|
|
||||||
private static final String TOPIC_NAME = "test-topic";
|
|
||||||
|
|
||||||
public static void main(String[] args) throws IOException {
|
|
||||||
// Create a Pulsar client instance. A single instance can be shared across many
|
|
||||||
// producers and consumer within the same application
|
|
||||||
PulsarClient client = PulsarClient.builder()
|
|
||||||
.serviceUrl(SERVICE_URL)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
// Configure producer specific settings
|
|
||||||
Producer<byte[]> producer = client.newProducer()
|
|
||||||
// Set the topic
|
|
||||||
.topic(TOPIC_NAME)
|
|
||||||
// Enable compression
|
|
||||||
.compressionType(CompressionType.LZ4)
|
|
||||||
.create();
|
|
||||||
|
|
||||||
// Once the producer is created, it can be used for the entire application life-cycle
|
|
||||||
System.out.println("Created producer for the topic "+TOPIC_NAME);
|
|
||||||
|
|
||||||
// Send 5 test messages
|
|
||||||
IntStream.range(1, 5).forEach(i -> {
|
|
||||||
String content = String.format("hi-pulsar-%d", i);
|
|
||||||
|
|
||||||
// Build a message object
|
|
||||||
Message<byte[]> msg = MessageBuilder.create()
|
|
||||||
.setContent(content.getBytes())
|
|
||||||
.build();
|
|
||||||
|
|
||||||
// Send each message and log message content and ID when successfully received
|
|
||||||
try {
|
|
||||||
MessageId msgId = producer.send(msg);
|
|
||||||
|
|
||||||
System.out.println("Published message '"+content+"' with the ID "+msgId);
|
|
||||||
} catch (PulsarClientException e) {
|
|
||||||
System.out.println(e.getMessage());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
client.close();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,59 +0,0 @@
|
||||||
package com.baeldung.subscriptions;
|
|
||||||
|
|
||||||
import org.apache.pulsar.client.api.ConsumerBuilder;
|
|
||||||
import org.apache.pulsar.client.api.Message;
|
|
||||||
import org.apache.pulsar.client.api.MessageBuilder;
|
|
||||||
import org.apache.pulsar.client.api.Producer;
|
|
||||||
import org.apache.pulsar.client.api.PulsarClient;
|
|
||||||
import org.apache.pulsar.client.api.PulsarClientException;
|
|
||||||
import org.apache.pulsar.client.api.SubscriptionType;
|
|
||||||
|
|
||||||
import java.util.stream.IntStream;
|
|
||||||
|
|
||||||
public class ExclusiveSubscriptionUnitTest {
|
|
||||||
private static final String SERVICE_URL = "pulsar://localhost:6650";
|
|
||||||
private static final String TOPIC_NAME = "test-topic";
|
|
||||||
private static final String SUBSCRIPTION_NAME = "test-subscription";
|
|
||||||
private static final SubscriptionType SUBSCRIPTION_TYPE = SubscriptionType.Exclusive;
|
|
||||||
|
|
||||||
public static void main(String[] args) throws PulsarClientException {
|
|
||||||
PulsarClient client = PulsarClient.builder()
|
|
||||||
.serviceUrl(SERVICE_URL)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
Producer<byte[]> producer = client.newProducer()
|
|
||||||
.topic(TOPIC_NAME)
|
|
||||||
.create();
|
|
||||||
|
|
||||||
ConsumerBuilder<byte[]> consumer1 = client.newConsumer()
|
|
||||||
.topic(TOPIC_NAME)
|
|
||||||
.subscriptionName(SUBSCRIPTION_NAME)
|
|
||||||
.subscriptionType(SUBSCRIPTION_TYPE);
|
|
||||||
|
|
||||||
ConsumerBuilder<byte[]> consumer2 = client.newConsumer()
|
|
||||||
.topic(TOPIC_NAME)
|
|
||||||
.subscriptionName(SUBSCRIPTION_NAME)
|
|
||||||
.subscriptionType(SUBSCRIPTION_TYPE);
|
|
||||||
|
|
||||||
IntStream.range(0, 999).forEach(i -> {
|
|
||||||
Message<byte[]> msg = MessageBuilder.create()
|
|
||||||
.setContent(String.format("message-%d", i).getBytes())
|
|
||||||
.build();
|
|
||||||
try {
|
|
||||||
producer.send(msg);
|
|
||||||
} catch (PulsarClientException e) {
|
|
||||||
System.out.println(e.getMessage());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Consumer 1 can subscribe to the topic
|
|
||||||
consumer1.subscribe();
|
|
||||||
|
|
||||||
// Consumer 2 cannot due to the exclusive subscription held by consumer 1
|
|
||||||
consumer2.subscribeAsync()
|
|
||||||
.handle((consumer, exception) -> {
|
|
||||||
System.out.println(exception.getMessage());
|
|
||||||
return null;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,76 +0,0 @@
|
||||||
package com.baeldung.subscriptions;
|
|
||||||
|
|
||||||
import org.apache.pulsar.client.api.Consumer;
|
|
||||||
import org.apache.pulsar.client.api.ConsumerBuilder;
|
|
||||||
import org.apache.pulsar.client.api.Message;
|
|
||||||
import org.apache.pulsar.client.api.MessageBuilder;
|
|
||||||
import org.apache.pulsar.client.api.Producer;
|
|
||||||
import org.apache.pulsar.client.api.PulsarClient;
|
|
||||||
import org.apache.pulsar.client.api.PulsarClientException;
|
|
||||||
import org.apache.pulsar.client.api.SubscriptionType;
|
|
||||||
|
|
||||||
import java.util.stream.IntStream;
|
|
||||||
|
|
||||||
public class FailoverSubscriptionUnitTest {
|
|
||||||
private static final String SERVICE_URL = "pulsar://localhost:6650";
|
|
||||||
private static final String TOPIC_NAME = "failover-subscription-test-topic";
|
|
||||||
private static final String SUBSCRIPTION_NAME = "test-subscription";
|
|
||||||
private static final SubscriptionType SUBSCRIPTION_TYPE = SubscriptionType.Failover;
|
|
||||||
private static final int NUM_MSGS = 10;
|
|
||||||
|
|
||||||
public static void main(String[] args) throws PulsarClientException {
|
|
||||||
PulsarClient client = PulsarClient.builder()
|
|
||||||
.serviceUrl(SERVICE_URL)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
Producer<byte[]> producer = client.newProducer()
|
|
||||||
.topic(TOPIC_NAME)
|
|
||||||
.create();
|
|
||||||
|
|
||||||
ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer()
|
|
||||||
.topic(TOPIC_NAME)
|
|
||||||
.subscriptionName(SUBSCRIPTION_NAME)
|
|
||||||
.subscriptionType(SUBSCRIPTION_TYPE);
|
|
||||||
|
|
||||||
Consumer<byte[]> mainConsumer = consumerBuilder
|
|
||||||
.consumerName("consumer-a")
|
|
||||||
.messageListener((consumer, msg) -> {
|
|
||||||
System.out.println("Message received by main consumer");
|
|
||||||
|
|
||||||
try {
|
|
||||||
consumer.acknowledge(msg);
|
|
||||||
} catch (PulsarClientException e) {
|
|
||||||
System.out.println(e.getMessage());
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.subscribe();
|
|
||||||
|
|
||||||
Consumer<byte[]> failoverConsumer = consumerBuilder
|
|
||||||
.consumerName("consumer-b")
|
|
||||||
.messageListener((consumer, msg) -> {
|
|
||||||
System.out.println("Message received by failover consumer");
|
|
||||||
|
|
||||||
try {
|
|
||||||
consumer.acknowledge(msg);
|
|
||||||
} catch (PulsarClientException e) {
|
|
||||||
System.out.println(e.getMessage());
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.subscribe();
|
|
||||||
|
|
||||||
IntStream.range(0, NUM_MSGS).forEach(i -> {
|
|
||||||
Message<byte[]> msg = MessageBuilder.create()
|
|
||||||
.setContent(String.format("message-%d", i).getBytes())
|
|
||||||
.build();
|
|
||||||
try {
|
|
||||||
producer.send(msg);
|
|
||||||
|
|
||||||
Thread.sleep(100);
|
|
||||||
|
|
||||||
if (i > 5) mainConsumer.close();
|
|
||||||
} catch (InterruptedException | PulsarClientException e) {
|
|
||||||
System.out.println(e.getMessage());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue