diff --git a/apache-rocketmq/README.md b/apache-rocketmq/README.md new file mode 100644 index 0000000000..be53f95790 --- /dev/null +++ b/apache-rocketmq/README.md @@ -0,0 +1,5 @@ +## Apache RocketMQ + +This module contains articles about Apache RocketMQ + +### Relevant Articles: diff --git a/apache-rocketmq/pom.xml b/apache-rocketmq/pom.xml new file mode 100644 index 0000000000..59c204dddf --- /dev/null +++ b/apache-rocketmq/pom.xml @@ -0,0 +1,27 @@ + + + 4.0.0 + apache-rocketmq + 1.0-SNAPSHOT + apache-rocketmq + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + + org.apache.rocketmq + rocketmq-spring-boot-starter + 2.0.4 + + + + + 1.6.0 + + diff --git a/apache-rocketmq/src/main/java/com/baeldung/rocketmq/consumer/CartEventConsumer.java b/apache-rocketmq/src/main/java/com/baeldung/rocketmq/consumer/CartEventConsumer.java new file mode 100644 index 0000000000..06e88076d0 --- /dev/null +++ b/apache-rocketmq/src/main/java/com/baeldung/rocketmq/consumer/CartEventConsumer.java @@ -0,0 +1,34 @@ +package com.baeldung.rocketmq.consumer; + +import com.baeldung.rocketmq.event.CartItemEvent; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.stereotype.Service; + +@SpringBootApplication +public class CartEventConsumer { + + public static void main(String[] args) { + SpringApplication.run(CartEventConsumer.class, args); + } + + @Service + @RocketMQMessageListener(topic = "cart-item-add-topic", consumerGroup = "cart-consumer_cart-item-add-topic") + public class CardItemAddConsumer implements RocketMQListener { + public void onMessage(CartItemEvent addItemEvent) { + System.out.println("Adding item: " + addItemEvent); + // logic + } + } + + @Service + @RocketMQMessageListener(topic = "cart-item-removed-topic", consumerGroup = "cart-consumer_cart-item-removed-topic") + public class CardItemRemoveConsumer implements RocketMQListener { + public void onMessage(CartItemEvent removeItemEvent) { + System.out.println("Removing item: " + removeItemEvent); + // logic + } + } +} diff --git a/apache-rocketmq/src/main/java/com/baeldung/rocketmq/event/CartItemEvent.java b/apache-rocketmq/src/main/java/com/baeldung/rocketmq/event/CartItemEvent.java new file mode 100644 index 0000000000..a9d7b4a436 --- /dev/null +++ b/apache-rocketmq/src/main/java/com/baeldung/rocketmq/event/CartItemEvent.java @@ -0,0 +1,32 @@ +package com.baeldung.rocketmq.event; + +public class CartItemEvent { + private String itemId; + private int quantity; + + public CartItemEvent(String itemId, int quantity) { + this.itemId = itemId; + this.quantity = quantity; + } + + public String getItemId() { + return itemId; + } + + public void setItemId(String itemId) { + this.itemId = itemId; + } + + public int getQuantity() { + return quantity; + } + + public void setQuantity(int quantity) { + this.quantity = quantity; + } + + @Override + public String toString() { + return "CartItemEvent{" + "itemId='" + itemId + '\'' + ", quantity=" + quantity + '}'; + } +} diff --git a/apache-rocketmq/src/main/java/com/baeldung/rocketmq/producer/CartEventProducer.java b/apache-rocketmq/src/main/java/com/baeldung/rocketmq/producer/CartEventProducer.java new file mode 100644 index 0000000000..dba6ee7a46 --- /dev/null +++ b/apache-rocketmq/src/main/java/com/baeldung/rocketmq/producer/CartEventProducer.java @@ -0,0 +1,26 @@ +package com.baeldung.rocketmq.producer; + + +import com.baeldung.rocketmq.event.CartItemEvent; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class CartEventProducer implements CommandLineRunner { + + @Autowired + private RocketMQTemplate rocketMQTemplate; + + public static void main(String[] args) { + SpringApplication.run(CartEventProducer.class, args); + } + + public void run(String... args) throws Exception { + rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("bike", 1)); + rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("computer", 2)); + rocketMQTemplate.convertAndSend("cart-item-removed-topic", new CartItemEvent("bike", 1)); + } +} diff --git a/apache-rocketmq/src/main/java/com/baeldung/rocketmq/transaction/TransactionListenerImpl.java b/apache-rocketmq/src/main/java/com/baeldung/rocketmq/transaction/TransactionListenerImpl.java new file mode 100644 index 0000000000..e5fa6e361a --- /dev/null +++ b/apache-rocketmq/src/main/java/com/baeldung/rocketmq/transaction/TransactionListenerImpl.java @@ -0,0 +1,21 @@ +package com.baeldung.rocketmq.transaction; + +import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; +import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; +import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; +import org.springframework.messaging.Message; + +@RocketMQTransactionListener(txProducerGroup = "test-transaction") +class TransactionListenerImpl implements RocketMQLocalTransactionListener { + @Override + public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { + // ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN + return RocketMQLocalTransactionState.UNKNOWN; + } + + @Override + public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { + // ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN + return RocketMQLocalTransactionState.COMMIT; + } +} diff --git a/apache-rocketmq/src/main/resources/application.properties b/apache-rocketmq/src/main/resources/application.properties new file mode 100644 index 0000000000..68d4ceaacd --- /dev/null +++ b/apache-rocketmq/src/main/resources/application.properties @@ -0,0 +1,9 @@ +rocketmq.name-server=127.0.0.1:9876 +rocketmq.producer.group=my-group +rocketmq.producer.send-message-timeout=300000 +rocketmq.producer.compress-message-body-threshold=4096 +rocketmq.producer.max-message-size=4194304 +rocketmq.producer.retry-times-when-send-async-failed=0 +rocketmq.producer.retry-next-server=true +rocketmq.producer.retry-times-when-send-failed=2 + diff --git a/pom.xml b/pom.xml index cc0c68d22f..9c95ba261c 100644 --- a/pom.xml +++ b/pom.xml @@ -356,6 +356,7 @@ apache-opennlp apache-poi apache-pulsar + apache-rocketmq apache-shiro apache-solrj apache-spark @@ -997,6 +998,7 @@ apache-opennlp apache-poi apache-pulsar + apache-rocketmq apache-shiro apache-solrj apache-spark