[BAEL-6138] - Receiving Push Notifications in PostreSQL with Spring Integration (#14467)

* [BAEL-4849] Article code

* [BAEL-4968] Article code

* [BAEL-4968] Article code

* [BAEL-4968] Article code

* [BAEL-4968] Remove extra comments

* [BAEL-5258] Article Code

* [BAEL-2765] PKCE Support for Secret Clients

* [BAEL-5698] Article code

* [BAEL-5698] Article code

* [BAEL-5905] Initial code

* [BAEL-5905] Article code

* [BAEL-5905] Relocate article code to new module

* [BAEL-6275] PostgreSQL NOTIFY/LISTEN

* [BAEL-6275] Minor correction

* BAEL-6138

* [BAEL-6138] WIP - LiveTest

* [BAEL-6138] Tutorial Code

* [BAEL-6138] Tutorial Code

---------

Co-authored-by: Philippe Sevestre <psevestre@gmail.com>
This commit is contained in:
psevestre 2023-07-23 17:45:57 -03:00 committed by GitHub
parent 4f24eb635e
commit 4291c56bdb
10 changed files with 531 additions and 30 deletions

View File

@ -22,7 +22,7 @@
<module>spring-amqp</module>
<module>spring-apache-camel</module>
<module>spring-jms</module>
<module>postgres-notify</module>
<module>postgres-notify</module>
</modules>
</project>

View File

@ -42,36 +42,37 @@
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<properties>
<java.version>1.8</java.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>instance1</id>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<jvmArguments>-Dserver.port=8081</jvmArguments>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<properties>
<java.version>1.8</java.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>instance1</id>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<jvmArguments>-Dserver.port=8081</jvmArguments>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -90,6 +90,11 @@
<artifactId>jaxb-api</artifactId>
<version>${jaxb-api.version}</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
</dependency>
</dependencies>
<build>
@ -126,6 +131,7 @@
<javax-activation.version>1.1.1</javax-activation.version>
<maven-eclipse-plugin.version>2.10</maven-eclipse-plugin.version>
<jaxb-api.version>2.3.0</jaxb-api.version>
<postgresql.version>42.3.8</postgresql.version>
</properties>
</project>

View File

@ -0,0 +1,92 @@
package com.baeldung.domain;
import java.math.BigDecimal;
public class Order {
private Long id;
private String symbol;
private OrderType orderType;
private BigDecimal price;
private BigDecimal quantity;
public Order() {}
public Order(Long id, String symbol, OrderType orderType, BigDecimal price, BigDecimal quantity) {
this.id = id;
this.symbol = symbol;
this.orderType = orderType;
this.price = price;
this.quantity = quantity;
}
/**
* @return the id
*/
public Long getId() {
return id;
}
/**
* @param id the id to set
*/
public void setId(Long id) {
this.id = id;
}
/**
* @return the symbol
*/
public String getSymbol() {
return symbol;
}
/**
* @param symbol the symbol to set
*/
public void setSymbol(String symbol) {
this.symbol = symbol;
}
/**
* @return the orderType
*/
public OrderType getOrderType() {
return orderType;
}
/**
* @param orderType the orderType to set
*/
public void setOrderType(OrderType orderType) {
this.orderType = orderType;
}
/**
* @return the price
*/
public BigDecimal getPrice() {
return price;
}
/**
* @param price the price to set
*/
public void setPrice(BigDecimal price) {
this.price = price;
}
/**
* @return the quantity
*/
public BigDecimal getQuantity() {
return quantity;
}
/**
* @param quantity the quantity to set
*/
public void setQuantity(BigDecimal quantity) {
this.quantity = quantity;
}
}

View File

@ -0,0 +1,16 @@
package com.baeldung.domain;
public enum OrderType {
BUY('B'),
SELL('S');
private final char code;
OrderType(char code) {
this.code = code;
}
public char getCode() {
return code;
}
}

View File

@ -0,0 +1,217 @@
package com.baeldung.subflows.postgresqlnotify;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.sql.DataSource;
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.integration.channel.AbstractSubscribableChannel;
import org.springframework.integration.dispatcher.MessageDispatcher;
import org.springframework.integration.dispatcher.UnicastingDispatcher;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.GenericMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* <p>This is a simplified backport of the version available on Spring Integration 6.x. for illustration purposes only.</p>
* <p>In particular, this implementation <b>does not persist messages</b> as the full-fledged version does.</p>
*
* @see https://github.com/spring-projects/spring-integration/blob/6.0.x/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java
*
*/
public class PostgresSubscribableChannel extends AbstractSubscribableChannel {
private static Logger log = LoggerFactory.getLogger(PostgresSubscribableChannel.class);
private static final String HEADER_FIELD = "h";
private static final String BODY_FIELD = "b";
private final Supplier<Connection> connectionProvider;
private final String channelName;
private final MessageDispatcher dispatcher = new UnicastingDispatcher();
private final DataSource ds;
private CountDownLatch startLatch;
private Executor executor;
private ObjectMapper om;
private NotifierTask notifierTask;
public PostgresSubscribableChannel(String channelName, Supplier<Connection> connectionProvider, DataSource ds, ObjectMapper om) {
this.connectionProvider = connectionProvider;
this.channelName = channelName;
this.ds = ds;
this.executor = new SimpleAsyncTaskExecutor("posgres-subscriber-" + channelName);
this.om = om;
}
@Override
protected MessageDispatcher getDispatcher() {
return this.dispatcher;
}
@Override
public boolean subscribe(MessageHandler handler) {
boolean r = super.subscribe(handler);
if (r && super.getSubscriberCount() == 1) {
log.info("subscribe: starting listener thread...");
startListenerThread();
}
return r;
}
@Override
public boolean unsubscribe(MessageHandler handle) {
boolean r = super.unsubscribe(handle);
if (r && super.getSubscriberCount() == 0) {
log.info("unsubscribe: stopping listener thread...");
stopListenerThread();
}
return r;
}
private void startListenerThread() {
startLatch = new CountDownLatch(1);
notifierTask = new NotifierTask(connectionProvider.get());
executor.execute(notifierTask);
try {
startLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException iex) {
throw new RuntimeException(iex);
}
}
private void stopListenerThread() {
notifierTask.kill();
}
@Override
protected boolean doSend(Message<?> message, long timeout) {
try {
String msg = prepareNotifyPayload(message);
try( Connection c = ds.getConnection()) {
log.debug("doSend:sending message: channel={}", channelName);
c.createStatement().execute("NOTIFY " + channelName + ", '" + msg + "'");
}
return true;
} catch (Exception ex) {
throw new MessageDeliveryException(message, "Unable to deliver message: " + ex.getMessage(),ex);
}
}
protected String prepareNotifyPayload(Message<?> message) throws JsonProcessingException {
Map<String, Object> rawMap = new HashMap<>();
rawMap.putAll(message.getHeaders());
JsonNode headerData = om.valueToTree(rawMap);
JsonNode bodyData = om.valueToTree(message.getPayload());
ObjectNode msg = om.getNodeFactory()
.objectNode();
msg.set(HEADER_FIELD, headerData);
msg.set(BODY_FIELD, bodyData);
return om.writeValueAsString(msg);
}
// Inner class that listens for notifications and dispatches them to subscribers
class NotifierTask implements Runnable {
private final Connection conn;
private final CountDownLatch stopLatch = new CountDownLatch(1);
NotifierTask(Connection conn) {
this.conn = conn;
}
void kill() {
try {
this.conn.close();
stopLatch.await(10, TimeUnit.SECONDS);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@Override
public void run() {
startLatch.countDown();
try (Statement st = conn.createStatement()) {
log.debug("notifierTask: enabling notifications for channel {}", channelName);
st.execute("LISTEN " + channelName);
PGConnection pgConn = conn.unwrap(PGConnection.class);
while (!Thread.currentThread()
.isInterrupted()) {
log.debug("notifierTask: wainting for notifications. channel={}", channelName);
PGNotification[] nts = pgConn.getNotifications();
log.debug("notifierTask: processing {} notification(s)", nts.length);
for (PGNotification n : nts) {
Message<?> msg = convertNotification(n);
getDispatcher().dispatch(msg);
}
}
} catch (SQLException sex) {
// TODO: Handle exceptions
} finally {
stopLatch.countDown();
}
}
@SuppressWarnings("unchecked")
private Message<?> convertNotification(PGNotification n) {
String payload = n.getParameter();
try {
JsonNode root = om.readTree(payload);
if (!root.isObject()) {
return new ErrorMessage(new IllegalArgumentException("Message is not a JSON Object"));
}
Map<String, Object> hdr;
JsonNode headers = root.path(HEADER_FIELD);
if (headers.isObject()) {
hdr = om.treeToValue(headers, Map.class);
} else {
hdr = Collections.emptyMap();
}
JsonNode body = root.path(BODY_FIELD);
return MessageBuilder
.withPayload(body.isTextual()?body.asText():body)
.copyHeaders(hdr)
.build();
} catch (Exception ex) {
return new ErrorMessage(ex);
}
}
}
}

View File

@ -0,0 +1,126 @@
package com.baeldung.subflows.postgresqlnotify;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.postgresql.ds.PGSimpleDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.jdbc.datasource.SingleConnectionDataSource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import com.baeldung.domain.Order;
import com.baeldung.domain.OrderType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@EnableIntegration
@IntegrationComponentScan
@PropertySource(value = "classpath:/database.properties", ignoreResourceNotFound = false)
public class PostgresqlPubSubExample {
private static final Logger log = LoggerFactory.getLogger(PostgresqlPubSubExample.class);
private Map<String,BigDecimal> orderSummary = new HashMap<>();
private final ObjectMapper om = new ObjectMapper();
private final Semaphore orderSemaphore = new Semaphore(0);
@MessagingGateway
public interface OrdersGateway {
@Gateway(requestChannel = "orders")
void publish(Order order);
}
@Bean
static SubscribableChannel orders(@Value("${db.url}") String url,@Value("${db.username}") String username, @Value("${db.password}")String password) {
// Connection supplier
SingleConnectionDataSource ds = new SingleConnectionDataSource(url, username, password, true);
Supplier<Connection> connectionSupplier = () -> {
try {
return ds.getConnection();
}
catch(SQLException ex) {
throw new RuntimeException(ex);
}
};
// DataSource
PGSimpleDataSource pgds = new PGSimpleDataSource();
pgds.setUrl(url);
pgds.setUser(username);
pgds.setPassword(password);
return new PostgresSubscribableChannel("orders", connectionSupplier, pgds, new ObjectMapper());
}
@Transformer(inputChannel = "orders" , outputChannel = "orderProcessor" )
Order validatedOrders(Message<?> orderMessage) throws JsonProcessingException {
ObjectNode on = (ObjectNode)orderMessage.getPayload();
Order order = om.treeToValue(on, Order.class);
return order;
}
@ServiceActivator(inputChannel = "orderProcessor")
void processOrder(Order order){
log.info("Processing order: id={}, symbol={}, qty={}, price={}",
order.getId(),
order.getSymbol(),
order.getQuantity(),
order.getPrice());
BigDecimal orderTotal = order.getQuantity().multiply(order.getPrice());
if ( order.getOrderType() == OrderType.SELL) {
orderTotal = orderTotal.negate();
}
BigDecimal sum = orderSummary.get(order.getSymbol());
if ( sum == null) {
sum = orderTotal;
}
else {
sum = sum.add(orderTotal);
}
orderSummary.put(order.getSymbol(), sum);
orderSemaphore.release();
}
public BigDecimal getTotalBySymbol(String symbol) {
return orderSummary.get(symbol);
}
public boolean awaitNextMessage(long time, TimeUnit unit) throws InterruptedException {
return orderSemaphore.tryAcquire(time, unit);
}
}

View File

@ -0,0 +1,40 @@
package com.baeldung.subflows.postgresqlnotify;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;
import java.math.BigDecimal;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import com.baeldung.domain.Order;
import com.baeldung.domain.OrderType;
import com.baeldung.subflows.postgresqlnotify.PostgresqlPubSubExample.OrdersGateway;
@SpringJUnitConfig(classes = {PostgresqlPubSubExample.class})
public class PostgresqlPubSubExampleLiveTest {
@Autowired
PostgresqlPubSubExample processor;
@Autowired
OrdersGateway ordersGateway;
@Test
void whenPublishOrder_thenSuccess() throws Exception{
Order o = new Order(1l,"BAEL", OrderType.BUY, BigDecimal.valueOf(2.0), BigDecimal.valueOf(5.0));
ordersGateway.publish(o);
assertThat(processor.awaitNextMessage(10, TimeUnit.SECONDS)).isTrue();
BigDecimal total = processor.getTotalBySymbol("BAEL");
assertThat(total).isEqualTo(BigDecimal.valueOf(10));
}
}

View File

@ -0,0 +1,3 @@
db.url=jdbc:postgresql://localhost:5432/baeldung
db.username=baeldung
db.password=SqD64PtsGhDXjn9f