From 4291c56bdb1f953e65999fc8c783ab480c9239f8 Mon Sep 17 00:00:00 2001 From: psevestre Date: Sun, 23 Jul 2023 17:45:57 -0300 Subject: [PATCH] [BAEL-6138] - Receiving Push Notifications in PostreSQL with Spring Integration (#14467) * [BAEL-4849] Article code * [BAEL-4968] Article code * [BAEL-4968] Article code * [BAEL-4968] Article code * [BAEL-4968] Remove extra comments * [BAEL-5258] Article Code * [BAEL-2765] PKCE Support for Secret Clients * [BAEL-5698] Article code * [BAEL-5698] Article code * [BAEL-5905] Initial code * [BAEL-5905] Article code * [BAEL-5905] Relocate article code to new module * [BAEL-6275] PostgreSQL NOTIFY/LISTEN * [BAEL-6275] Minor correction * BAEL-6138 * [BAEL-6138] WIP - LiveTest * [BAEL-6138] Tutorial Code * [BAEL-6138] Tutorial Code --------- Co-authored-by: Philippe Sevestre --- messaging-modules/pom.xml | 2 +- messaging-modules/postgres-notify/pom.xml | 59 ++--- spring-integration/pom.xml | 6 + .../main/java/com/baeldung/domain/Order.java | 92 ++++++++ .../java/com/baeldung/domain/OrderType.java | 16 ++ .../PostgresSubscribableChannel.java | 217 ++++++++++++++++++ .../PostgresqlPubSubExample.java | 126 ++++++++++ .../RouteToRecipientsExample.java | 0 .../PostgresqlPubSubExampleLiveTest.java | 40 ++++ .../src/test/resources/database.properties | 3 + 10 files changed, 531 insertions(+), 30 deletions(-) create mode 100644 spring-integration/src/main/java/com/baeldung/domain/Order.java create mode 100644 spring-integration/src/main/java/com/baeldung/domain/OrderType.java create mode 100644 spring-integration/src/main/java/com/baeldung/subflows/postgresqlnotify/PostgresSubscribableChannel.java create mode 100644 spring-integration/src/main/java/com/baeldung/subflows/postgresqlnotify/PostgresqlPubSubExample.java rename spring-integration/src/main/java/com/baeldung/subflows/{routeToRecipients => routetorecipients}/RouteToRecipientsExample.java (100%) create mode 100644 spring-integration/src/test/java/com/baeldung/subflows/postgresqlnotify/PostgresqlPubSubExampleLiveTest.java create mode 100644 spring-integration/src/test/resources/database.properties diff --git a/messaging-modules/pom.xml b/messaging-modules/pom.xml index 27524637ab..6fd14f7c64 100644 --- a/messaging-modules/pom.xml +++ b/messaging-modules/pom.xml @@ -22,7 +22,7 @@ spring-amqp spring-apache-camel spring-jms - postgres-notify + postgres-notify \ No newline at end of file diff --git a/messaging-modules/postgres-notify/pom.xml b/messaging-modules/postgres-notify/pom.xml index 876519f40c..174d66b7f5 100644 --- a/messaging-modules/postgres-notify/pom.xml +++ b/messaging-modules/postgres-notify/pom.xml @@ -42,36 +42,37 @@ lombok true - + - - 1.8 - - - - - org.springframework.boot - spring-boot-maven-plugin - - - - - - instance1 - - - - org.springframework.boot - spring-boot-maven-plugin - - -Dserver.port=8081 - - - - - - - + + 1.8 + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + + instance1 + + + + org.springframework.boot + spring-boot-maven-plugin + + -Dserver.port=8081 + + + + + + + \ No newline at end of file diff --git a/spring-integration/pom.xml b/spring-integration/pom.xml index 9882e02c57..abf5cfb3d6 100644 --- a/spring-integration/pom.xml +++ b/spring-integration/pom.xml @@ -90,6 +90,11 @@ jaxb-api ${jaxb-api.version} + + org.postgresql + postgresql + ${postgresql.version} + @@ -126,6 +131,7 @@ 1.1.1 2.10 2.3.0 + 42.3.8 \ No newline at end of file diff --git a/spring-integration/src/main/java/com/baeldung/domain/Order.java b/spring-integration/src/main/java/com/baeldung/domain/Order.java new file mode 100644 index 0000000000..3e26c00f4a --- /dev/null +++ b/spring-integration/src/main/java/com/baeldung/domain/Order.java @@ -0,0 +1,92 @@ +package com.baeldung.domain; + +import java.math.BigDecimal; + +public class Order { + + private Long id; + private String symbol; + private OrderType orderType; + private BigDecimal price; + private BigDecimal quantity; + + public Order() {} + + public Order(Long id, String symbol, OrderType orderType, BigDecimal price, BigDecimal quantity) { + this.id = id; + this.symbol = symbol; + this.orderType = orderType; + this.price = price; + this.quantity = quantity; + } + + /** + * @return the id + */ + public Long getId() { + return id; + } + + /** + * @param id the id to set + */ + public void setId(Long id) { + this.id = id; + } + + /** + * @return the symbol + */ + public String getSymbol() { + return symbol; + } + + /** + * @param symbol the symbol to set + */ + public void setSymbol(String symbol) { + this.symbol = symbol; + } + + /** + * @return the orderType + */ + public OrderType getOrderType() { + return orderType; + } + + /** + * @param orderType the orderType to set + */ + public void setOrderType(OrderType orderType) { + this.orderType = orderType; + } + + /** + * @return the price + */ + public BigDecimal getPrice() { + return price; + } + + /** + * @param price the price to set + */ + public void setPrice(BigDecimal price) { + this.price = price; + } + + /** + * @return the quantity + */ + public BigDecimal getQuantity() { + return quantity; + } + + /** + * @param quantity the quantity to set + */ + public void setQuantity(BigDecimal quantity) { + this.quantity = quantity; + } +} diff --git a/spring-integration/src/main/java/com/baeldung/domain/OrderType.java b/spring-integration/src/main/java/com/baeldung/domain/OrderType.java new file mode 100644 index 0000000000..bb1e5b6abc --- /dev/null +++ b/spring-integration/src/main/java/com/baeldung/domain/OrderType.java @@ -0,0 +1,16 @@ +package com.baeldung.domain; + + +public enum OrderType { + BUY('B'), + SELL('S'); + private final char code; + + OrderType(char code) { + this.code = code; + } + + public char getCode() { + return code; + } +} diff --git a/spring-integration/src/main/java/com/baeldung/subflows/postgresqlnotify/PostgresSubscribableChannel.java b/spring-integration/src/main/java/com/baeldung/subflows/postgresqlnotify/PostgresSubscribableChannel.java new file mode 100644 index 0000000000..3ee77f69dc --- /dev/null +++ b/spring-integration/src/main/java/com/baeldung/subflows/postgresqlnotify/PostgresSubscribableChannel.java @@ -0,0 +1,217 @@ +package com.baeldung.subflows.postgresqlnotify; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import javax.sql.DataSource; + +import org.postgresql.PGConnection; +import org.postgresql.PGNotification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.integration.channel.AbstractSubscribableChannel; +import org.springframework.integration.dispatcher.MessageDispatcher; +import org.springframework.integration.dispatcher.UnicastingDispatcher; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageDeliveryException; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.support.ErrorMessage; +import org.springframework.messaging.support.GenericMessage; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +/** + *

This is a simplified backport of the version available on Spring Integration 6.x. for illustration purposes only.

+ *

In particular, this implementation does not persist messages as the full-fledged version does.

+ * + * @see https://github.com/spring-projects/spring-integration/blob/6.0.x/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java + * + */ +public class PostgresSubscribableChannel extends AbstractSubscribableChannel { + + private static Logger log = LoggerFactory.getLogger(PostgresSubscribableChannel.class); + + private static final String HEADER_FIELD = "h"; + private static final String BODY_FIELD = "b"; + + private final Supplier connectionProvider; + private final String channelName; + private final MessageDispatcher dispatcher = new UnicastingDispatcher(); + private final DataSource ds; + private CountDownLatch startLatch; + private Executor executor; + private ObjectMapper om; + private NotifierTask notifierTask; + + public PostgresSubscribableChannel(String channelName, Supplier connectionProvider, DataSource ds, ObjectMapper om) { + + this.connectionProvider = connectionProvider; + this.channelName = channelName; + this.ds = ds; + this.executor = new SimpleAsyncTaskExecutor("posgres-subscriber-" + channelName); + this.om = om; + + } + + @Override + protected MessageDispatcher getDispatcher() { + return this.dispatcher; + } + + @Override + public boolean subscribe(MessageHandler handler) { + boolean r = super.subscribe(handler); + if (r && super.getSubscriberCount() == 1) { + log.info("subscribe: starting listener thread..."); + startListenerThread(); + } + return r; + } + + @Override + public boolean unsubscribe(MessageHandler handle) { + boolean r = super.unsubscribe(handle); + if (r && super.getSubscriberCount() == 0) { + log.info("unsubscribe: stopping listener thread..."); + stopListenerThread(); + } + + return r; + } + + private void startListenerThread() { + + startLatch = new CountDownLatch(1); + notifierTask = new NotifierTask(connectionProvider.get()); + executor.execute(notifierTask); + try { + startLatch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException iex) { + throw new RuntimeException(iex); + } + } + + private void stopListenerThread() { + notifierTask.kill(); + } + + @Override + protected boolean doSend(Message message, long timeout) { + try { + String msg = prepareNotifyPayload(message); + + try( Connection c = ds.getConnection()) { + log.debug("doSend:sending message: channel={}", channelName); + c.createStatement().execute("NOTIFY " + channelName + ", '" + msg + "'"); + } + + return true; + } catch (Exception ex) { + throw new MessageDeliveryException(message, "Unable to deliver message: " + ex.getMessage(),ex); + } + } + + protected String prepareNotifyPayload(Message message) throws JsonProcessingException { + Map rawMap = new HashMap<>(); + rawMap.putAll(message.getHeaders()); + JsonNode headerData = om.valueToTree(rawMap); + JsonNode bodyData = om.valueToTree(message.getPayload()); + + ObjectNode msg = om.getNodeFactory() + .objectNode(); + msg.set(HEADER_FIELD, headerData); + msg.set(BODY_FIELD, bodyData); + return om.writeValueAsString(msg); + } + + // Inner class that listens for notifications and dispatches them to subscribers + class NotifierTask implements Runnable { + + private final Connection conn; + private final CountDownLatch stopLatch = new CountDownLatch(1); + + NotifierTask(Connection conn) { + this.conn = conn; + } + + void kill() { + try { + this.conn.close(); + stopLatch.await(10, TimeUnit.SECONDS); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + @Override + public void run() { + + startLatch.countDown(); + + try (Statement st = conn.createStatement()) { + log.debug("notifierTask: enabling notifications for channel {}", channelName); + st.execute("LISTEN " + channelName); + + PGConnection pgConn = conn.unwrap(PGConnection.class); + + while (!Thread.currentThread() + .isInterrupted()) { + log.debug("notifierTask: wainting for notifications. channel={}", channelName); + PGNotification[] nts = pgConn.getNotifications(); + log.debug("notifierTask: processing {} notification(s)", nts.length); + + for (PGNotification n : nts) { + Message msg = convertNotification(n); + getDispatcher().dispatch(msg); + } + } + } catch (SQLException sex) { + // TODO: Handle exceptions + } finally { + stopLatch.countDown(); + } + } + + @SuppressWarnings("unchecked") + private Message convertNotification(PGNotification n) { + String payload = n.getParameter(); + try { + JsonNode root = om.readTree(payload); + + if (!root.isObject()) { + return new ErrorMessage(new IllegalArgumentException("Message is not a JSON Object")); + } + + Map hdr; + JsonNode headers = root.path(HEADER_FIELD); + + if (headers.isObject()) { + hdr = om.treeToValue(headers, Map.class); + } else { + hdr = Collections.emptyMap(); + } + + JsonNode body = root.path(BODY_FIELD); + return MessageBuilder + .withPayload(body.isTextual()?body.asText():body) + .copyHeaders(hdr) + .build(); + } catch (Exception ex) { + return new ErrorMessage(ex); + } + } + } +} diff --git a/spring-integration/src/main/java/com/baeldung/subflows/postgresqlnotify/PostgresqlPubSubExample.java b/spring-integration/src/main/java/com/baeldung/subflows/postgresqlnotify/PostgresqlPubSubExample.java new file mode 100644 index 0000000000..06da8864bf --- /dev/null +++ b/spring-integration/src/main/java/com/baeldung/subflows/postgresqlnotify/PostgresqlPubSubExample.java @@ -0,0 +1,126 @@ +package com.baeldung.subflows.postgresqlnotify; + +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.postgresql.ds.PGSimpleDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.PropertySource; +import org.springframework.integration.annotation.Gateway; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.annotation.Transformer; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.PublishSubscribeChannel; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.jdbc.datasource.SingleConnectionDataSource; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.SubscribableChannel; + +import com.baeldung.domain.Order; +import com.baeldung.domain.OrderType; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +@EnableIntegration +@IntegrationComponentScan +@PropertySource(value = "classpath:/database.properties", ignoreResourceNotFound = false) +public class PostgresqlPubSubExample { + + private static final Logger log = LoggerFactory.getLogger(PostgresqlPubSubExample.class); + + private Map orderSummary = new HashMap<>(); + + private final ObjectMapper om = new ObjectMapper(); + private final Semaphore orderSemaphore = new Semaphore(0); + + + @MessagingGateway + public interface OrdersGateway { + + @Gateway(requestChannel = "orders") + void publish(Order order); + } + + + @Bean + static SubscribableChannel orders(@Value("${db.url}") String url,@Value("${db.username}") String username, @Value("${db.password}")String password) { + + // Connection supplier + SingleConnectionDataSource ds = new SingleConnectionDataSource(url, username, password, true); + Supplier connectionSupplier = () -> { + try { + return ds.getConnection(); + } + catch(SQLException ex) { + throw new RuntimeException(ex); + } + }; + + // DataSource + PGSimpleDataSource pgds = new PGSimpleDataSource(); + pgds.setUrl(url); + pgds.setUser(username); + pgds.setPassword(password); + + return new PostgresSubscribableChannel("orders", connectionSupplier, pgds, new ObjectMapper()); + } + + @Transformer(inputChannel = "orders" , outputChannel = "orderProcessor" ) + Order validatedOrders(Message orderMessage) throws JsonProcessingException { + ObjectNode on = (ObjectNode)orderMessage.getPayload(); + Order order = om.treeToValue(on, Order.class); + return order; + } + + + @ServiceActivator(inputChannel = "orderProcessor") + void processOrder(Order order){ + + log.info("Processing order: id={}, symbol={}, qty={}, price={}", + order.getId(), + order.getSymbol(), + order.getQuantity(), + order.getPrice()); + + BigDecimal orderTotal = order.getQuantity().multiply(order.getPrice()); + if ( order.getOrderType() == OrderType.SELL) { + orderTotal = orderTotal.negate(); + } + + BigDecimal sum = orderSummary.get(order.getSymbol()); + if ( sum == null) { + sum = orderTotal; + } + else { + sum = sum.add(orderTotal); + } + + orderSummary.put(order.getSymbol(), sum); + orderSemaphore.release(); + + } + + + public BigDecimal getTotalBySymbol(String symbol) { + return orderSummary.get(symbol); + } + + public boolean awaitNextMessage(long time, TimeUnit unit) throws InterruptedException { + return orderSemaphore.tryAcquire(time, unit); + } + +} diff --git a/spring-integration/src/main/java/com/baeldung/subflows/routeToRecipients/RouteToRecipientsExample.java b/spring-integration/src/main/java/com/baeldung/subflows/routetorecipients/RouteToRecipientsExample.java similarity index 100% rename from spring-integration/src/main/java/com/baeldung/subflows/routeToRecipients/RouteToRecipientsExample.java rename to spring-integration/src/main/java/com/baeldung/subflows/routetorecipients/RouteToRecipientsExample.java diff --git a/spring-integration/src/test/java/com/baeldung/subflows/postgresqlnotify/PostgresqlPubSubExampleLiveTest.java b/spring-integration/src/test/java/com/baeldung/subflows/postgresqlnotify/PostgresqlPubSubExampleLiveTest.java new file mode 100644 index 0000000000..a70c4b358d --- /dev/null +++ b/spring-integration/src/test/java/com/baeldung/subflows/postgresqlnotify/PostgresqlPubSubExampleLiveTest.java @@ -0,0 +1,40 @@ +package com.baeldung.subflows.postgresqlnotify; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.*; + +import java.math.BigDecimal; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.PropertySource; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import com.baeldung.domain.Order; +import com.baeldung.domain.OrderType; +import com.baeldung.subflows.postgresqlnotify.PostgresqlPubSubExample.OrdersGateway; + +@SpringJUnitConfig(classes = {PostgresqlPubSubExample.class}) +public class PostgresqlPubSubExampleLiveTest { + + @Autowired + PostgresqlPubSubExample processor; + + @Autowired + OrdersGateway ordersGateway; + + @Test + void whenPublishOrder_thenSuccess() throws Exception{ + + Order o = new Order(1l,"BAEL", OrderType.BUY, BigDecimal.valueOf(2.0), BigDecimal.valueOf(5.0)); + ordersGateway.publish(o); + + assertThat(processor.awaitNextMessage(10, TimeUnit.SECONDS)).isTrue(); + + BigDecimal total = processor.getTotalBySymbol("BAEL"); + assertThat(total).isEqualTo(BigDecimal.valueOf(10)); + } + +} diff --git a/spring-integration/src/test/resources/database.properties b/spring-integration/src/test/resources/database.properties new file mode 100644 index 0000000000..d03b3688b5 --- /dev/null +++ b/spring-integration/src/test/resources/database.properties @@ -0,0 +1,3 @@ +db.url=jdbc:postgresql://localhost:5432/baeldung +db.username=baeldung +db.password=SqD64PtsGhDXjn9f