diff --git a/apache-pulsar/.gitignore b/apache-pulsar/.gitignore deleted file mode 100755 index 1c53e03007..0000000000 --- a/apache-pulsar/.gitignore +++ /dev/null @@ -1,8 +0,0 @@ -.classpath -.project -.settings -target -.idea -*.iml -.gradle/ -build/ diff --git a/apache-pulsar/README.md b/apache-pulsar/README.md deleted file mode 100644 index c44849a490..0000000000 --- a/apache-pulsar/README.md +++ /dev/null @@ -1,7 +0,0 @@ -## Apache Pulsar - -This module contains articles about Apache Pulsar - -### Relevant Articles: - -- [Introduction to Apache Pulsar](https://www.baeldung.com/apache-pulsar) diff --git a/apache-pulsar/pom.xml b/apache-pulsar/pom.xml deleted file mode 100644 index 568389f9f5..0000000000 --- a/apache-pulsar/pom.xml +++ /dev/null @@ -1,32 +0,0 @@ - - - 4.0.0 - com.baeldung.pulsar - apache-pulsar - 0.0.1 - apache-pulsar - - - com.baeldung - parent-modules - 1.0.0-SNAPSHOT - .. - - - - - org.apache.pulsar - pulsar-client - ${pulsar-client.version} - compile - - - - - 2.1.1-incubating - - - diff --git a/apache-pulsar/src/main/java/com/baeldung/ConsumerUnitTest.java b/apache-pulsar/src/main/java/com/baeldung/ConsumerUnitTest.java deleted file mode 100644 index 82a0028837..0000000000 --- a/apache-pulsar/src/main/java/com/baeldung/ConsumerUnitTest.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.baeldung; - -import java.io.IOException; - -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.SubscriptionType; - -public class ConsumerUnitTest { - - private static final String SERVICE_URL = "pulsar://localhost:6650"; - private static final String TOPIC_NAME = "test-topic"; - private static final String SUBSCRIPTION_NAME = "test-subscription"; - - public static void main(String[] args) throws IOException { - // Create a Pulsar client instance. A single instance can be shared across many - // producers and consumer within the same application - PulsarClient client = PulsarClient.builder() - .serviceUrl(SERVICE_URL) - .build(); - - //Configure consumer specific settings. - Consumer consumer = client.newConsumer() - .topic(TOPIC_NAME) - // Allow multiple consumers to attach to the same subscription - // and get messages dispatched as a queue - .subscriptionType(SubscriptionType.Shared) - .subscriptionName(SUBSCRIPTION_NAME) - .subscribe(); - - - // Once the consumer is created, it can be used for the entire application lifecycle - System.out.println("Created consumer for the topic "+ TOPIC_NAME); - - do { - // Wait until a message is available - Message msg = consumer.receive(); - - // Extract the message as a printable string and then log - String content = new String(msg.getData()); - System.out.println("Received message '"+content+"' with ID "+msg.getMessageId()); - - // Acknowledge processing of the message so that it can be deleted - consumer.acknowledge(msg); - } while (true); - } -} diff --git a/apache-pulsar/src/main/java/com/baeldung/ProducerUnitTest.java b/apache-pulsar/src/main/java/com/baeldung/ProducerUnitTest.java deleted file mode 100644 index 10a4b46c4d..0000000000 --- a/apache-pulsar/src/main/java/com/baeldung/ProducerUnitTest.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.baeldung; - -import org.apache.pulsar.client.api.CompressionType; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageBuilder; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; - -import java.io.IOException; -import java.util.stream.IntStream; - -public class ProducerUnitTest { - - private static final String SERVICE_URL = "pulsar://localhost:6650"; - private static final String TOPIC_NAME = "test-topic"; - - public static void main(String[] args) throws IOException { - // Create a Pulsar client instance. A single instance can be shared across many - // producers and consumer within the same application - PulsarClient client = PulsarClient.builder() - .serviceUrl(SERVICE_URL) - .build(); - - // Configure producer specific settings - Producer producer = client.newProducer() - // Set the topic - .topic(TOPIC_NAME) - // Enable compression - .compressionType(CompressionType.LZ4) - .create(); - - // Once the producer is created, it can be used for the entire application life-cycle - System.out.println("Created producer for the topic "+TOPIC_NAME); - - // Send 5 test messages - IntStream.range(1, 5).forEach(i -> { - String content = String.format("hi-pulsar-%d", i); - - // Build a message object - Message msg = MessageBuilder.create() - .setContent(content.getBytes()) - .build(); - - // Send each message and log message content and ID when successfully received - try { - MessageId msgId = producer.send(msg); - - System.out.println("Published message '"+content+"' with the ID "+msgId); - } catch (PulsarClientException e) { - System.out.println(e.getMessage()); - } - }); - - client.close(); - } -} diff --git a/apache-pulsar/src/main/java/com/baeldung/subscriptions/ExclusiveSubscriptionUnitTest.java b/apache-pulsar/src/main/java/com/baeldung/subscriptions/ExclusiveSubscriptionUnitTest.java deleted file mode 100644 index 79121347e7..0000000000 --- a/apache-pulsar/src/main/java/com/baeldung/subscriptions/ExclusiveSubscriptionUnitTest.java +++ /dev/null @@ -1,59 +0,0 @@ -package com.baeldung.subscriptions; - -import org.apache.pulsar.client.api.ConsumerBuilder; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageBuilder; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.SubscriptionType; - -import java.util.stream.IntStream; - -public class ExclusiveSubscriptionUnitTest { - private static final String SERVICE_URL = "pulsar://localhost:6650"; - private static final String TOPIC_NAME = "test-topic"; - private static final String SUBSCRIPTION_NAME = "test-subscription"; - private static final SubscriptionType SUBSCRIPTION_TYPE = SubscriptionType.Exclusive; - - public static void main(String[] args) throws PulsarClientException { - PulsarClient client = PulsarClient.builder() - .serviceUrl(SERVICE_URL) - .build(); - - Producer producer = client.newProducer() - .topic(TOPIC_NAME) - .create(); - - ConsumerBuilder consumer1 = client.newConsumer() - .topic(TOPIC_NAME) - .subscriptionName(SUBSCRIPTION_NAME) - .subscriptionType(SUBSCRIPTION_TYPE); - - ConsumerBuilder consumer2 = client.newConsumer() - .topic(TOPIC_NAME) - .subscriptionName(SUBSCRIPTION_NAME) - .subscriptionType(SUBSCRIPTION_TYPE); - - IntStream.range(0, 999).forEach(i -> { - Message msg = MessageBuilder.create() - .setContent(String.format("message-%d", i).getBytes()) - .build(); - try { - producer.send(msg); - } catch (PulsarClientException e) { - System.out.println(e.getMessage()); - } - }); - - // Consumer 1 can subscribe to the topic - consumer1.subscribe(); - - // Consumer 2 cannot due to the exclusive subscription held by consumer 1 - consumer2.subscribeAsync() - .handle((consumer, exception) -> { - System.out.println(exception.getMessage()); - return null; - }); - } -} diff --git a/apache-pulsar/src/main/java/com/baeldung/subscriptions/FailoverSubscriptionUnitTest.java b/apache-pulsar/src/main/java/com/baeldung/subscriptions/FailoverSubscriptionUnitTest.java deleted file mode 100644 index 1d13b4b83a..0000000000 --- a/apache-pulsar/src/main/java/com/baeldung/subscriptions/FailoverSubscriptionUnitTest.java +++ /dev/null @@ -1,76 +0,0 @@ -package com.baeldung.subscriptions; - -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerBuilder; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageBuilder; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.SubscriptionType; - -import java.util.stream.IntStream; - -public class FailoverSubscriptionUnitTest { - private static final String SERVICE_URL = "pulsar://localhost:6650"; - private static final String TOPIC_NAME = "failover-subscription-test-topic"; - private static final String SUBSCRIPTION_NAME = "test-subscription"; - private static final SubscriptionType SUBSCRIPTION_TYPE = SubscriptionType.Failover; - private static final int NUM_MSGS = 10; - - public static void main(String[] args) throws PulsarClientException { - PulsarClient client = PulsarClient.builder() - .serviceUrl(SERVICE_URL) - .build(); - - Producer producer = client.newProducer() - .topic(TOPIC_NAME) - .create(); - - ConsumerBuilder consumerBuilder = client.newConsumer() - .topic(TOPIC_NAME) - .subscriptionName(SUBSCRIPTION_NAME) - .subscriptionType(SUBSCRIPTION_TYPE); - - Consumer mainConsumer = consumerBuilder - .consumerName("consumer-a") - .messageListener((consumer, msg) -> { - System.out.println("Message received by main consumer"); - - try { - consumer.acknowledge(msg); - } catch (PulsarClientException e) { - System.out.println(e.getMessage()); - } - }) - .subscribe(); - - Consumer failoverConsumer = consumerBuilder - .consumerName("consumer-b") - .messageListener((consumer, msg) -> { - System.out.println("Message received by failover consumer"); - - try { - consumer.acknowledge(msg); - } catch (PulsarClientException e) { - System.out.println(e.getMessage()); - } - }) - .subscribe(); - - IntStream.range(0, NUM_MSGS).forEach(i -> { - Message msg = MessageBuilder.create() - .setContent(String.format("message-%d", i).getBytes()) - .build(); - try { - producer.send(msg); - - Thread.sleep(100); - - if (i > 5) mainConsumer.close(); - } catch (InterruptedException | PulsarClientException e) { - System.out.println(e.getMessage()); - } - }); - } -}