From d6330507ffc2e70f8777696bbfa54ff3459cc0b5 Mon Sep 17 00:00:00 2001 From: Krzysztof Majewski Date: Wed, 4 Dec 2019 16:11:36 +0100 Subject: [PATCH 1/3] BAEL-3457 Apache RocketMQ --- apache-rocketmq/README.md | 5 +++ apache-rocketmq/pom.xml | 27 +++++++++++++++ .../rocketmq/consumer/CartEventConsumer.java | 34 +++++++++++++++++++ .../rocketmq/event/CartItemEvent.java | 32 +++++++++++++++++ .../rocketmq/producer/CartEventProducer.java | 26 ++++++++++++++ .../src/main/resources/application.properties | 9 +++++ 6 files changed, 133 insertions(+) create mode 100644 apache-rocketmq/README.md create mode 100644 apache-rocketmq/pom.xml create mode 100644 apache-rocketmq/src/main/java/com/baeldung/rocketmq/consumer/CartEventConsumer.java create mode 100644 apache-rocketmq/src/main/java/com/baeldung/rocketmq/event/CartItemEvent.java create mode 100644 apache-rocketmq/src/main/java/com/baeldung/rocketmq/producer/CartEventProducer.java create mode 100644 apache-rocketmq/src/main/resources/application.properties diff --git a/apache-rocketmq/README.md b/apache-rocketmq/README.md new file mode 100644 index 0000000000..be53f95790 --- /dev/null +++ b/apache-rocketmq/README.md @@ -0,0 +1,5 @@ +## Apache RocketMQ + +This module contains articles about Apache RocketMQ + +### Relevant Articles: diff --git a/apache-rocketmq/pom.xml b/apache-rocketmq/pom.xml new file mode 100644 index 0000000000..59c204dddf --- /dev/null +++ b/apache-rocketmq/pom.xml @@ -0,0 +1,27 @@ + + + 4.0.0 + apache-rocketmq + 1.0-SNAPSHOT + apache-rocketmq + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + + org.apache.rocketmq + rocketmq-spring-boot-starter + 2.0.4 + + + + + 1.6.0 + + diff --git a/apache-rocketmq/src/main/java/com/baeldung/rocketmq/consumer/CartEventConsumer.java b/apache-rocketmq/src/main/java/com/baeldung/rocketmq/consumer/CartEventConsumer.java new file mode 100644 index 0000000000..06e88076d0 --- /dev/null +++ b/apache-rocketmq/src/main/java/com/baeldung/rocketmq/consumer/CartEventConsumer.java @@ -0,0 +1,34 @@ +package com.baeldung.rocketmq.consumer; + +import com.baeldung.rocketmq.event.CartItemEvent; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.stereotype.Service; + +@SpringBootApplication +public class CartEventConsumer { + + public static void main(String[] args) { + SpringApplication.run(CartEventConsumer.class, args); + } + + @Service + @RocketMQMessageListener(topic = "cart-item-add-topic", consumerGroup = "cart-consumer_cart-item-add-topic") + public class CardItemAddConsumer implements RocketMQListener { + public void onMessage(CartItemEvent addItemEvent) { + System.out.println("Adding item: " + addItemEvent); + // logic + } + } + + @Service + @RocketMQMessageListener(topic = "cart-item-removed-topic", consumerGroup = "cart-consumer_cart-item-removed-topic") + public class CardItemRemoveConsumer implements RocketMQListener { + public void onMessage(CartItemEvent removeItemEvent) { + System.out.println("Removing item: " + removeItemEvent); + // logic + } + } +} diff --git a/apache-rocketmq/src/main/java/com/baeldung/rocketmq/event/CartItemEvent.java b/apache-rocketmq/src/main/java/com/baeldung/rocketmq/event/CartItemEvent.java new file mode 100644 index 0000000000..a9d7b4a436 --- /dev/null +++ b/apache-rocketmq/src/main/java/com/baeldung/rocketmq/event/CartItemEvent.java @@ -0,0 +1,32 @@ +package com.baeldung.rocketmq.event; + +public class CartItemEvent { + private String itemId; + private int quantity; + + public CartItemEvent(String itemId, int quantity) { + this.itemId = itemId; + this.quantity = quantity; + } + + public String getItemId() { + return itemId; + } + + public void setItemId(String itemId) { + this.itemId = itemId; + } + + public int getQuantity() { + return quantity; + } + + public void setQuantity(int quantity) { + this.quantity = quantity; + } + + @Override + public String toString() { + return "CartItemEvent{" + "itemId='" + itemId + '\'' + ", quantity=" + quantity + '}'; + } +} diff --git a/apache-rocketmq/src/main/java/com/baeldung/rocketmq/producer/CartEventProducer.java b/apache-rocketmq/src/main/java/com/baeldung/rocketmq/producer/CartEventProducer.java new file mode 100644 index 0000000000..dba6ee7a46 --- /dev/null +++ b/apache-rocketmq/src/main/java/com/baeldung/rocketmq/producer/CartEventProducer.java @@ -0,0 +1,26 @@ +package com.baeldung.rocketmq.producer; + + +import com.baeldung.rocketmq.event.CartItemEvent; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class CartEventProducer implements CommandLineRunner { + + @Autowired + private RocketMQTemplate rocketMQTemplate; + + public static void main(String[] args) { + SpringApplication.run(CartEventProducer.class, args); + } + + public void run(String... args) throws Exception { + rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("bike", 1)); + rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("computer", 2)); + rocketMQTemplate.convertAndSend("cart-item-removed-topic", new CartItemEvent("bike", 1)); + } +} diff --git a/apache-rocketmq/src/main/resources/application.properties b/apache-rocketmq/src/main/resources/application.properties new file mode 100644 index 0000000000..68d4ceaacd --- /dev/null +++ b/apache-rocketmq/src/main/resources/application.properties @@ -0,0 +1,9 @@ +rocketmq.name-server=127.0.0.1:9876 +rocketmq.producer.group=my-group +rocketmq.producer.send-message-timeout=300000 +rocketmq.producer.compress-message-body-threshold=4096 +rocketmq.producer.max-message-size=4194304 +rocketmq.producer.retry-times-when-send-async-failed=0 +rocketmq.producer.retry-next-server=true +rocketmq.producer.retry-times-when-send-failed=2 + From ba4ca46e2ab03392e8e66af17e8315ce2f5bc967 Mon Sep 17 00:00:00 2001 From: Krzysztof Majewski Date: Wed, 4 Dec 2019 16:15:49 +0100 Subject: [PATCH 2/3] BAEL-3457 Apache RocketMq --- .../transaction/TransactionListenerImpl.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 apache-rocketmq/src/main/java/com/baeldung/rocketmq/transaction/TransactionListenerImpl.java diff --git a/apache-rocketmq/src/main/java/com/baeldung/rocketmq/transaction/TransactionListenerImpl.java b/apache-rocketmq/src/main/java/com/baeldung/rocketmq/transaction/TransactionListenerImpl.java new file mode 100644 index 0000000000..e5fa6e361a --- /dev/null +++ b/apache-rocketmq/src/main/java/com/baeldung/rocketmq/transaction/TransactionListenerImpl.java @@ -0,0 +1,21 @@ +package com.baeldung.rocketmq.transaction; + +import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; +import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; +import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; +import org.springframework.messaging.Message; + +@RocketMQTransactionListener(txProducerGroup = "test-transaction") +class TransactionListenerImpl implements RocketMQLocalTransactionListener { + @Override + public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { + // ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN + return RocketMQLocalTransactionState.UNKNOWN; + } + + @Override + public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { + // ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN + return RocketMQLocalTransactionState.COMMIT; + } +} From 5d4b3705c3c8bee4447bced194709062efc62a07 Mon Sep 17 00:00:00 2001 From: Krzysztof Majewski Date: Wed, 4 Dec 2019 16:19:48 +0100 Subject: [PATCH 3/3] BAEL-3457 Apache RocketMQ --- pom.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pom.xml b/pom.xml index 7c46f29c74..ff02405d88 100644 --- a/pom.xml +++ b/pom.xml @@ -356,6 +356,7 @@ apache-opennlp apache-poi apache-pulsar + apache-rocketmq apache-shiro apache-solrj apache-spark @@ -1003,6 +1004,7 @@ apache-opennlp apache-poi apache-pulsar + apache-rocketmq apache-shiro apache-solrj apache-spark