Merge pull request #16158 from RizaFarheen/saga-pattern

saga-pattern
This commit is contained in:
Andrea Cerasoni 2024-03-24 06:45:11 +00:00 committed by GitHub
commit afef19ac9e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 1329 additions and 0 deletions

View File

@ -0,0 +1,13 @@
# Saga Pattern Using Orkes Conductor
This is an example project showing how to build event driven applications using [Conductor](https://github.com/conductor-oss/conductor)
# Pre-requisites
1. Docker
2. Running conductor server
**Start the conductor server**
```shell
docker run --init -p 8080:8080 -p 1234:5000 conductoross/conductor-standalone:3.15.0
```

View File

@ -0,0 +1,34 @@
buildscript {
dependencies {
classpath "org.springframework.boot:spring-boot-gradle-plugin:3.2.3"
}
}
plugins {
id 'java'
id 'org.springframework.boot' version '3.2.3'
id 'io.freefair.lombok' version '8.6'
}
apply plugin: 'io.spring.dependency-management'
group = 'io.orkes.example'
version = '1.0-SNAPSHOT'
repositories {
mavenCentral()
}
dependencies {
testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'io.orkes.conductor:orkes-conductor-client:2.1.0'
implementation 'org.xerial:sqlite-jdbc:3.32.3.3'
}
test {
useJUnitPlatform()
}

View File

@ -0,0 +1,2 @@
rootProject.name = 'conductor-examples-food-delivery'

View File

@ -0,0 +1,28 @@
package io.orkes.example.saga;
import io.orkes.example.saga.dao.BaseDAO;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import lombok.AllArgsConstructor;
@AllArgsConstructor
@SpringBootApplication
@ComponentScan(basePackages = {"io.orkes"})
public class SagaApplication {
private static final BaseDAO db = new BaseDAO("jdbc:sqlite:food_delivery.db");
public static void main(String[] args) {
SpringApplication.run(SagaApplication.class, args);
initDB();
}
public static void initDB() {
db.createTables("orders");
db.createTables("inventory");
db.createTables("payments");
db.createTables("shipments");
}
}

View File

@ -0,0 +1,27 @@
package io.orkes.example.saga.controller;
import io.orkes.example.saga.pojos.FoodDeliveryRequest;
import io.orkes.example.saga.pojos.OrderRequest;
import io.orkes.example.saga.service.WorkflowService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
@Slf4j
@AllArgsConstructor
@RestController
public class OrderServiceController {
private final WorkflowService workflowService;
@PostMapping(value = "/triggerFoodDeliveryFlow", produces = "application/json")
public ResponseEntity<Map<String, Object>> triggerFoodDeliveryFlow(@RequestBody FoodDeliveryRequest foodDeliveryRequest) {
return ResponseEntity.ok(workflowService.startFoodDeliveryWorkflow(foodDeliveryRequest));
}
}

View File

@ -0,0 +1,221 @@
package io.orkes.example.saga.dao;
import java.sql.*;
public class BaseDAO {
private String url;
public BaseDAO(String url) {
this.url = url;
}
protected Connection connect() {
Connection conn = null;
try {
conn = DriverManager.getConnection(this.url);
} catch (SQLException e) {
System.out.println(e.getMessage());
}
return conn;
}
protected Boolean execute(String sql) {
try (Connection conn = DriverManager.getConnection(this.url); Statement stmt = conn.createStatement()) {
stmt.execute(sql);
} catch (SQLException e) {
System.out.println(e.getMessage());
return false;
}
return true;
}
public void createTables(String service) {
switch (service) {
case "orders":
createOrdersTable();
createOrderDetailsTable();
createCustomerTable();
break;
case "inventory":
createRestaurantsTable();
break;
case "shipments":
createDriversTable();
createShipmentTable();
break;
case "payments":
createPaymentsTable();
break;
default:
System.out.println("Service name not recognized");
}
}
private void createOrdersTable() {
if (!tableExists("orders")) {
String sql = "CREATE TABLE orders (\n"
+ " orderId text PRIMARY KEY,\n"
+ " customerId integer NOT NULL,\n"
+ " restaurantId integer NOT NULL,\n"
+ " deliveryAddress text NOT NULL,\n"
+ " createdAt TIMESTAMP NOT NULL,\n"
+ " status text NOT NULL\n"
+ ");";
execute(sql);
}
}
private void createOrderDetailsTable() {
if (!tableExists("orders_details")) {
String sql = "CREATE TABLE orders_details (\n"
+ " orderId text PRIMARY KEY,\n"
+ " items text NOT NULL,\n"
+ " notes text\n"
+ ");";
execute(sql);
}
}
private void createCustomerTable() {
if (tableExists("customers")) {
return;
}
String sql = "CREATE TABLE customers (\n"
+ " id integer PRIMARY KEY AUTOINCREMENT,\n"
+ " email text NOT NULL,\n"
+ " name text NOT NULL,\n"
+ " contact text\n"
+ ");";
if(execute(sql)) {
seedCustomers();
}
}
private void createRestaurantsTable() {
if (!tableExists("restaurants")) {
String sql = "CREATE TABLE restaurants (\n"
+ " id integer PRIMARY KEY AUTOINCREMENT,\n"
+ " name text NOT NULL,\n"
+ " address text NOT NULL,\n"
+ " contact text NOT NULL\n"
+ ");";
if (execute(sql)) {
seedRestaurants();
}
}
}
private void createPaymentsTable() {
if (tableExists("payments")) {
return;
}
String sql = "CREATE TABLE payments (\n"
+ " paymentId text PRIMARY KEY,\n"
+ " orderId text NOT NULL,\n"
+ " amount number NOT NULL,\n"
+ " method text,\n"
+ " status text,\n"
+ " createdAt TIMESTAMP NOT NULL\n"
+ ");";
execute(sql);
}
private void createDriversTable() {
if (tableExists("drivers")) {
return;
}
String sql = "CREATE TABLE drivers (\n"
+ " id integer PRIMARY KEY AUTOINCREMENT,\n"
+ " name text NOT NULL,\n"
+ " contact text\n"
+ ");";
if(execute(sql)) {
seedDrivers();
}
}
private void createShipmentTable() {
if (tableExists("shipments")) {
return;
}
String sql = "CREATE TABLE shipments (\n"
+ " id integer PRIMARY KEY AUTOINCREMENT,\n"
+ " orderId text NOT NULL,\n"
+ " driverId number NOT NULL,\n"
+ " address text NOT NULL,\n"
+ " instructions text,\n"
+ " status text NOT NULL,\n"
+ " createdAt TIMESTAMP NOT NULL\n"
+ ");";
execute(sql);
}
private void seedCustomers() {
String[] queries = {
"INSERT INTO customers(email, name, contact) VALUES('John Smith','john.smith@example.com','+12126781345');",
"INSERT INTO customers(email, name, contact) VALUES('Mike Ross','mike.ross@example.com','+15466711147');",
"INSERT INTO customers(email, name, contact) VALUES('Martha Williams','martha.williams@example.com','+12790581941');"
};
for (String query : queries) {
execute(query);
}
}
private void seedRestaurants() {
String[] add = {
"5331 Redford Court, Montgomery AL 36116",
"43 West 4th Street, New York NY 10024",
"1693 Alice Court, Annapolis MD 21401"
};
String[] queries = {
"INSERT INTO restaurants(name, address, contact) VALUES('Mikes','+12121231345','" + add[0] + "');",
"INSERT INTO restaurants(name, address, contact) VALUES('Tamarind','+12412311147','" + add[1] + "');",
"INSERT INTO restaurants(name, address, contact) VALUES('Thai Place','+14790981941','" + add[2] + "');",
};
for (String query : queries) {
execute(query);
}
}
private void seedDrivers() {
String[] queries = {
"INSERT INTO drivers(name,contact) VALUES('Wayne Stevens','+12520711467');",
"INSERT INTO drivers(name,contact) VALUES('Jim Willis','+16466281981');",
"INSERT INTO drivers(name,contact) VALUES('Steven Carroll','+12612590430');",
"INSERT INTO drivers(name,contact) VALUES('Tom Cruise','+18659581430');"
};
for (String query : queries) {
execute(query);
}
}
boolean tableExists(String tableName) {
try {
Connection conn = DriverManager.getConnection(this.url);
DatabaseMetaData meta = conn.getMetaData();
ResultSet resultSet = meta.getTables(null, null, tableName, new String[] {"TABLE"});
boolean exists = resultSet.next();
conn.close();
return exists;
} catch (SQLException e) {
System.out.println(e.getMessage());
}
return false;
}
}

View File

@ -0,0 +1,33 @@
package io.orkes.example.saga.dao;
import io.orkes.example.saga.pojos.Order;
import io.orkes.example.saga.pojos.Restaurant;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class InventoryDAO extends BaseDAO {
public InventoryDAO(String url) {
super(url);
}
public void readRestaurant(int restaurantId, Restaurant restaurant) {
String sql = "SELECT name, address, contact FROM restaurants WHERE id = ?";
try (Connection conn = this.connect(); PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setInt(1, restaurantId);
ResultSet rs = pstmt.executeQuery();
while (rs.next()) {
restaurant.setName(rs.getString("name"));
restaurant.setAddress(rs.getString("address"));
restaurant.setContact(rs.getString("contact"));
}
} catch (SQLException e) {
System.out.println(e.getMessage());
}
}
}

View File

@ -0,0 +1,135 @@
package io.orkes.example.saga.dao;
import io.orkes.example.saga.pojos.Customer;
import io.orkes.example.saga.pojos.Order;
import java.sql.*;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.TimeZone;
import java.util.Date;
public class OrdersDAO extends BaseDAO {
public OrdersDAO(String url) {
super(url);
}
public String insertOrder(Order order) {
Date date = new Date();
Timestamp nowAsTS = new Timestamp(date.getTime());
String itemsStr = String.join("", order.getOrderDetails().getItems().toString());
String notesStr = null;
if(!order.getOrderDetails().getNotes().isEmpty()) {
notesStr = String.join("", order.getOrderDetails().getNotes().toString());
} else {
notesStr = "";
}
String sql = "INSERT INTO orders(orderId,customerId,restaurantId,deliveryAddress,createdAt,status) VALUES(?,?,?,?,?,?)";
try (Connection conn = this.connect(); PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, order.getOrderId());
pstmt.setInt(2, order.getCustomer().getId());
pstmt.setInt(3, order.getRestaurantId());
pstmt.setString(4, order.getDeliveryAddress());
pstmt.setTimestamp(5, nowAsTS);
pstmt.setString(6, order.getStatus().name());
pstmt.executeUpdate();
} catch (SQLException e) {
System.out.println(e.getMessage());
return e.getMessage();
}
sql = "INSERT INTO orders_details(orderId,items,notes) VALUES(?,?,?)";
try (Connection conn = this.connect(); PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, order.getOrderId());
pstmt.setString(2, itemsStr);
pstmt.setString(3, notesStr);
pstmt.executeUpdate();
} catch (SQLException e) {
System.out.println(e.getMessage());
return e.getMessage();
}
return "";
}
public void updateOrder(Order order) {
String sql = "UPDATE orders SET restaurantId=?,deliveryAddress=?,status=? WHERE orderId=?";
try (Connection conn = this.connect(); PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setInt(1, order.getRestaurantId());
pstmt.setString(2, order.getDeliveryAddress());
pstmt.setString(3, order.getStatus().name());
pstmt.setString(4, order.getOrderId());
pstmt.executeUpdate();
} catch (SQLException e) {
System.out.println(e.getMessage());
}
}
public void readOrder(String orderId, Order order) {
String sql = "SELECT orderId, customerId, restaurantId, deliveryAddress, createdAt, status FROM orders WHERE orderId = ?";
try (Connection conn = this.connect(); PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, orderId);
ResultSet rs = pstmt.executeQuery();
while (rs.next()) {
order.setOrderId(rs.getString("orderId"));
Customer customer = new Customer();
customer.setId(rs.getInt("customerId"));
order.setCustomer(customer);
order.setRestaurantId(rs.getInt("restaurantId"));
order.setDeliveryAddress(rs.getString("deliveryAddress"));
order.setCreatedAt(rs.getLong("createdAt"));
order.setStatus(Order.Status.valueOf(rs.getString("status")));
}
} catch (SQLException e) {
System.out.println(e.getMessage());
}
}
public int insertCustomer(Customer customer) {
int id = 0;
try (Connection conn = this.connect(); PreparedStatement pstmt = conn.prepareStatement("SELECT id FROM customers WHERE email = ?")) {
pstmt.setString(1, customer.getEmail());
ResultSet rs = pstmt.executeQuery();
if (rs.next()) {
id = rs.getInt("id");
}
} catch (SQLException e) {
System.out.println(e.getMessage());
}
if (id > 0) {
return id;
}
String sql = "INSERT INTO customers(email,name,contact) VALUES(?,?,?)";
try (Connection conn = this.connect(); PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, customer.getEmail());
pstmt.setString(2, customer.getName());
pstmt.setString(3, customer.getContact());
pstmt.executeUpdate();
} catch (SQLException e) {
System.out.println(e.getMessage());
}
try (Connection conn = this.connect(); PreparedStatement pstmt = conn.prepareStatement("SELECT id FROM customers WHERE email = ?")) {
pstmt.setString(1, customer.getEmail());
ResultSet rs = pstmt.executeQuery();
id = rs.getInt("id");
} catch (SQLException e) {
System.out.println(e.getMessage());
}
return id;
}
}

View File

@ -0,0 +1,65 @@
package io.orkes.example.saga.dao;
import io.orkes.example.saga.pojos.Payment;
import java.sql.*;
import java.util.Date;
public class PaymentsDAO extends BaseDAO {
public PaymentsDAO(String url) {
super(url);
}
public String insertPayment(Payment payment) {
Date date = new Date();
Timestamp nowAsTS = new Timestamp(date.getTime());
String sql = "INSERT INTO payments(paymentId, orderId, amount, method, createdAt, status) VALUES(?,?,?,?,?,?);";
try (Connection conn = this.connect(); PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, payment.getPaymentId());
pstmt.setString(2, payment.getOrderId());
pstmt.setDouble(3, payment.getAmount());
pstmt.setString(4, payment.getPaymentMethod().toString());
pstmt.setTimestamp(5, nowAsTS);
pstmt.setString(6, payment.getStatus().name());
pstmt.executeUpdate();
} catch (SQLException e) {
System.out.println(e.getMessage());
return e.getMessage();
}
return "";
}
public void updatePaymentStatus(Payment payment) {
String sql = "UPDATE payments SET status=? WHERE paymentId=?;";
try (Connection conn = this.connect(); PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, payment.getStatus().name());
pstmt.setString(2, payment.getPaymentId());
pstmt.executeUpdate();
} catch (SQLException e) {
System.out.println(e.getMessage());
}
}
public void readPayment(String orderId, Payment payment) {
String sql = "SELECT paymentId, orderId, amount, method, createdAt, status FROM payments WHERE orderId = ?";
try (Connection conn = this.connect(); PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, orderId);
ResultSet rs = pstmt.executeQuery();
while (rs.next()) {
payment.setPaymentId(rs.getString("paymentId"));
payment.setOrderId(rs.getString("orderId"));
payment.setAmount(rs.getDouble("amount"));
payment.setCreatedAt(rs.getLong("createdAt"));
payment.setStatus(Payment.Status.valueOf(rs.getString("status")));
}
} catch (SQLException e) {
System.out.println(e.getMessage());
}
}
}

View File

@ -0,0 +1,75 @@
package io.orkes.example.saga.dao;
import io.orkes.example.saga.pojos.Driver;
import io.orkes.example.saga.pojos.Shipment;
import java.sql.*;
import java.util.Date;
public class ShipmentDAO extends BaseDAO {
public ShipmentDAO(String url) {
super(url);
}
public boolean insertShipment(Shipment shipment) {
Date date = new Date();
Timestamp nowAsTS = new Timestamp(date.getTime());
String sql = "INSERT INTO shipments(orderId,driverId,address,instructions,createdAt,status) VALUES(?,?,?,?,?,?)";
try (Connection conn = this.connect(); PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, shipment.getOrderId());
pstmt.setInt(2, shipment.getDriverId());
pstmt.setString(3, shipment.getDeliveryAddress());
pstmt.setString(4, shipment.getDeliveryInstructions());
pstmt.setTimestamp(5, nowAsTS);
pstmt.setString(6, Shipment.Status.SCHEDULED.name());
pstmt.executeUpdate();
} catch (SQLException e) {
System.out.println(e.getMessage());
return false;
}
return true;
}
public void cancelShipment(String orderId) {
String sql = "UPDATE shipments SET status=? WHERE orderId=?;";
try (Connection conn = this.connect(); PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, Shipment.Status.CANCELED.name());
pstmt.setString(2, orderId);
pstmt.executeUpdate();
} catch (SQLException e) {
System.out.println(e.getMessage());
}
}
public void confirmShipment(String orderId) {
String sql = "UPDATE shipments SET status=? WHERE orderId=?;";
try (Connection conn = this.connect(); PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, Shipment.Status.CONFIRMED.name());
pstmt.setString(2, orderId);
pstmt.executeUpdate();
} catch (SQLException e) {
System.out.println(e.getMessage());
}
}
public void readDriver(int driverId, Driver driver) {
String sql = "SELECT name, contact FROM drivers WHERE id = ?";
try (Connection conn = this.connect(); PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setInt(1, driverId);
ResultSet rs = pstmt.executeQuery();
while (rs.next()) {
driver.setId(driverId);
driver.setName(rs.getString("name"));
driver.setContact(rs.getString("contact"));
}
} catch (SQLException e) {
System.out.println(e.getMessage());
}
}
}

View File

@ -0,0 +1,8 @@
package io.orkes.example.saga.pojos;
import lombok.Data;
@Data
public class CancelRequest {
private String orderId;
}

View File

@ -0,0 +1,11 @@
package io.orkes.example.saga.pojos;
import lombok.Data;
import java.util.ArrayList;
@Data
public class CheckInventoryRequest {
private int restaurantId;
private ArrayList<FoodItem> items;
}

View File

@ -0,0 +1,11 @@
package io.orkes.example.saga.pojos;
import lombok.Data;
@Data
public class Customer {
private int id;
private String email;
private String name;
private String contact;
}

View File

@ -0,0 +1,10 @@
package io.orkes.example.saga.pojos;
import lombok.Data;
@Data
public class Driver {
int id;
String name;
String contact;
}

View File

@ -0,0 +1,11 @@
package io.orkes.example.saga.pojos;
import lombok.Data;
@Data
public class DriverNotificationRequest {
int driverId;
String dropOff;
String pickUp;
String orderId;
}

View File

@ -0,0 +1,19 @@
package io.orkes.example.saga.pojos;
import lombok.Data;
import java.util.ArrayList;
@Data
public class FoodDeliveryRequest {
private String customerEmail;
private String customerName;
private String customerContact;
private int restaurantId;
private ArrayList<Object> foodItems;
private ArrayList<String> additionalNotes;
private String address;
private String deliveryInstructions;
private double paymentAmount;
private Object paymentMethod;
}

View File

@ -0,0 +1,11 @@
package io.orkes.example.saga.pojos;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class FoodItem {
private String item;
private int quantity;
}

View File

@ -0,0 +1,24 @@
package io.orkes.example.saga.pojos;
import lombok.Data;
import java.util.EnumMap;
@Data
public class Order {
public enum Status {
PENDING,
ASSIGNED,
CONFIRMED,
CANCELLED
}
private String orderId;
private Customer customer;
private int restaurantId;
private String deliveryAddress;
private long createdAt;
private Status status;
private OrderDetails orderDetails;
}

View File

@ -0,0 +1,11 @@
package io.orkes.example.saga.pojos;
import lombok.Data;
import java.util.ArrayList;
@Data
public class OrderDetails {
private String orderId;
private ArrayList<FoodItem> items;
private ArrayList<String> notes;
}

View File

@ -0,0 +1,18 @@
package io.orkes.example.saga.pojos;
import lombok.Data;
import java.util.ArrayList;
@Data
public class OrderRequest {
private String OrderRequestId;
private String customerEmail;
private String customerName;
private String customerContact;
private int restaurantId;
private ArrayList<FoodItem> items;
private ArrayList<String> notes;
private String deliveryAddress;
private String deliveryInstructions;
}

View File

@ -0,0 +1,20 @@
package io.orkes.example.saga.pojos;
import lombok.Data;
@Data
public class Payment {
public enum Status {
PENDING,
FAILED,
SUCCESSFUL,
CANCELED
}
private String paymentId;
private String orderId;
private double amount;
private PaymentMethod paymentMethod;
private Status status;
private long createdAt;
private String errorMsg;
}

View File

@ -0,0 +1,12 @@
package io.orkes.example.saga.pojos;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class PaymentDetails {
private String number;
private String expiry;
private int cvv;
}

View File

@ -0,0 +1,11 @@
package io.orkes.example.saga.pojos;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class PaymentMethod {
private String type;
private PaymentDetails details;
}

View File

@ -0,0 +1,11 @@
package io.orkes.example.saga.pojos;
import lombok.Data;
@Data
public class PaymentRequest {
private String orderId;
private int customerId;
private float amount;
private PaymentMethod method;
}

View File

@ -0,0 +1,11 @@
package io.orkes.example.saga.pojos;
import lombok.Data;
@Data
public class Restaurant {
int id;
String name;
String address;
String contact;
}

View File

@ -0,0 +1,20 @@
package io.orkes.example.saga.pojos;
import lombok.Data;
@Data
public class Shipment {
public enum Status {
SCHEDULED,
CONFIRMED,
DELIVERED,
CANCELED
}
private int id;
private String orderId;
private int driverId;
private String deliveryAddress;
private String deliveryInstructions;
private long createdAt;
private String status;
}

View File

@ -0,0 +1,10 @@
package io.orkes.example.saga.pojos;
import lombok.Data;
@Data
public class ShippingRequest {
private String orderId;
private String deliveryAddress;
private String deliveryInstructions;
}

View File

@ -0,0 +1,23 @@
package io.orkes.example.saga.service;
import io.orkes.example.saga.dao.InventoryDAO;
import io.orkes.example.saga.pojos.FoodItem;
import io.orkes.example.saga.pojos.Restaurant;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Objects;
@Slf4j
public class InventoryService {
private static final InventoryDAO inventoryDAO = new InventoryDAO("jdbc:sqlite:food_delivery.db");
public static boolean checkAvailability(int restaurantId, ArrayList<FoodItem> items) {
Restaurant restaurant = new Restaurant();
restaurant.setId(restaurantId);
restaurant.setName("");
inventoryDAO.readRestaurant(restaurantId, restaurant);
return !Objects.equals(restaurant.getName(), "");
}
}

View File

@ -0,0 +1,72 @@
package io.orkes.example.saga.service;
import io.orkes.example.saga.dao.OrdersDAO;
import io.orkes.example.saga.pojos.Customer;
import io.orkes.example.saga.pojos.Order;
import io.orkes.example.saga.pojos.OrderDetails;
import io.orkes.example.saga.pojos.OrderRequest;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Slf4j
@AllArgsConstructor
@Service
public class OrderService {
private static final OrdersDAO ORDERS_DAO = new OrdersDAO("jdbc:sqlite:food_delivery.db");
public static String createOrder(OrderRequest orderRequest) {
UUID uuid = UUID.randomUUID();
String uuidAsString = uuid.toString();
Order order = new Order();
order.setOrderId(uuidAsString);
Customer customer = new Customer();
customer.setEmail(orderRequest.getCustomerEmail());
customer.setName(orderRequest.getCustomerName());
customer.setContact(orderRequest.getCustomerContact());
customer.setId(ORDERS_DAO.insertCustomer(customer));
log.info("Upsert customer record in DB with id: {}", customer.getId());
order.setCustomer(customer);
order.setRestaurantId(orderRequest.getRestaurantId());
order.setDeliveryAddress(orderRequest.getDeliveryAddress());
order.setStatus(Order.Status.PENDING);
OrderDetails orderDetails = new OrderDetails();
orderDetails.setOrderId(uuidAsString);
orderDetails.setItems(orderRequest.getItems());
orderDetails.setNotes(orderRequest.getNotes());
order.setOrderDetails(orderDetails);
String error = ORDERS_DAO.insertOrder(order);
if (error.isEmpty()) {
log.info("Created order with id: {}", order.getOrderId());
}
else {
log.error("Order creation failure: {}", error);
return null;
}
return uuidAsString;
}
public static Order getOrder(String orderId) {
Order order = new Order();
ORDERS_DAO.readOrder(orderId, order);
return order;
}
public static void cancelOrder(Order order) {
order.setStatus(Order.Status.CANCELLED);
log.info("Cancelling order {}", order.getOrderId());
ORDERS_DAO.updateOrder(order);
}
}

View File

@ -0,0 +1,85 @@
package io.orkes.example.saga.service;
import io.orkes.example.saga.dao.PaymentsDAO;
import io.orkes.example.saga.pojos.Payment;
import io.orkes.example.saga.pojos.PaymentDetails;
import io.orkes.example.saga.pojos.PaymentMethod;
import io.orkes.example.saga.pojos.PaymentRequest;
import lombok.extern.slf4j.Slf4j;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Objects;
import java.util.UUID;
@Slf4j
public class PaymentService {
private static final PaymentsDAO paymentsDAO = new PaymentsDAO("jdbc:sqlite:food_delivery.db");
public static Payment createPayment(PaymentRequest paymentRequest) {
UUID uuid = UUID.randomUUID();
String uuidAsString = uuid.toString();
Payment payment = new Payment();
payment.setPaymentId(uuidAsString);
payment.setOrderId(paymentRequest.getOrderId());
payment.setAmount(paymentRequest.getAmount());
payment.setPaymentMethod(paymentRequest.getMethod());
payment.setStatus(Payment.Status.PENDING);
// Check if returned error is non-empty, i.e failure
if (!paymentsDAO.insertPayment(payment).isEmpty()) {
log.error("Failed to process payment for order {}", paymentRequest.getOrderId());
payment.setErrorMsg("Payment creation failure");
payment.setStatus(Payment.Status.FAILED);
}
else {
if(makePayment(payment)) {
payment.setStatus(Payment.Status.SUCCESSFUL);
} else {
payment.setStatus(Payment.Status.FAILED);
}
}
// Record final status
paymentsDAO.updatePaymentStatus(payment);
return payment;
}
public static void cancelPayment(String orderId) {
// Cancel Payment in DB
Payment payment = new Payment();
paymentsDAO.readPayment(orderId, payment);
payment.setStatus(Payment.Status.CANCELED);
paymentsDAO.updatePaymentStatus(payment);
}
private static boolean makePayment(Payment payment) {
if (Objects.equals(payment.getPaymentMethod().getType(), "Credit Card")) {
PaymentDetails details = payment.getPaymentMethod().getDetails();
DateFormat dateFormat= new SimpleDateFormat("MM/yyyy");
Date expiry = new Date();
try {
expiry = dateFormat.parse(details.getExpiry());
} catch (ParseException e) {
payment.setErrorMsg("Invalid expiry date:" + details.getExpiry());
return false;
}
Date today = new Date();
if (today.getTime() > expiry.getTime()) {
payment.setErrorMsg("Expired payment method:" + details.getExpiry());
return false;
}
}
// Ideally an async call would be made with a callback
// But, we're skipping that and assuming payment went through
return true;
}
}

View File

@ -0,0 +1,64 @@
package io.orkes.example.saga.service;
import io.orkes.example.saga.dao.ShipmentDAO;
import io.orkes.example.saga.pojos.Driver;
import io.orkes.example.saga.pojos.Shipment;
import io.orkes.example.saga.pojos.ShippingRequest;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
@Slf4j
public class ShipmentService {
private static final ShipmentDAO shipmentDAO = new ShipmentDAO("jdbc:sqlite:food_delivery.db");
public static int createShipment(ShippingRequest shippingRequest) {
String orderId = shippingRequest.getOrderId();
Shipment shipment = new Shipment();
shipment.setOrderId(orderId);
shipment.setDeliveryAddress(shippingRequest.getDeliveryAddress());
shipment.setDeliveryInstructions(shippingRequest.getDeliveryInstructions());
int driverId = findDriver();
shipment.setDriverId(driverId);
if (!shipmentDAO.insertShipment(shipment)) {
log.error("Shipment creation for order {} failed.", orderId);
return 0;
}
Driver driver = new Driver();
driver.setName("");
shipmentDAO.readDriver(driverId, driver);
if (driver.getName().isBlank()) {
log.error("Shipment creation for order {} failed as driver in the area is not available.", orderId);
shipmentDAO.cancelShipment(orderId);
return 0;
}
else {
log.info("Assigned driver {} to order with id: {}", driverId, orderId);
shipmentDAO.confirmShipment(orderId);
}
return driverId;
}
public static void cancelDelivery(String orderId) {
shipmentDAO.cancelShipment(orderId);
}
private static int findDriver() {
Random random = new Random();
int driverId = 0;
int counter = 0;
while (counter < 10) {
driverId = random.nextInt(4);
if(driverId !=0) break;
counter += 1;
}
return driverId;
}
}

View File

@ -0,0 +1,61 @@
package io.orkes.example.saga.service;
import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest;
import io.orkes.conductor.client.WorkflowClient;
import io.orkes.example.saga.pojos.FoodDeliveryRequest;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@AllArgsConstructor
@Service
public class WorkflowService {
private final WorkflowClient workflowClient;
private final Environment environment;
public Map<String, Object> startFoodDeliveryWorkflow(FoodDeliveryRequest foodDeliveryRequest) {
StartWorkflowRequest request = new StartWorkflowRequest();
request.setName("FoodDeliveryWorkflow");
request.setVersion(1);
request.setCorrelationId("api-triggered");
String TASK_DOMAIN_PROPERTY = "conductor.worker.all.domain";
String domain = environment.getProperty(TASK_DOMAIN_PROPERTY, String.class, "");
if (!domain.isEmpty()) {
Map<String, String> taskToDomain = new HashMap<>();
taskToDomain.put("*", domain);
request.setTaskToDomain(taskToDomain);
}
Map<String, Object> inputData = new HashMap<>();
inputData.put("customerEmail", foodDeliveryRequest.getCustomerEmail());
inputData.put("customerName", foodDeliveryRequest.getCustomerName());
inputData.put("customerContact", foodDeliveryRequest.getCustomerContact());
inputData.put("restaurantId", foodDeliveryRequest.getRestaurantId());
inputData.put("foodItems", foodDeliveryRequest.getFoodItems());
inputData.put("additionalNotes", foodDeliveryRequest.getAdditionalNotes());
inputData.put("address", foodDeliveryRequest.getAddress());
inputData.put("deliveryInstructions", foodDeliveryRequest.getDeliveryInstructions());
inputData.put("paymentAmount", foodDeliveryRequest.getPaymentAmount());
inputData.put("paymentMethod", foodDeliveryRequest.getPaymentMethod());
request.setInput(inputData);
String workflowId = "";
try {
workflowId = workflowClient.startWorkflow(request);
log.info("Workflow id: {}", workflowId);
} catch (Exception ex) {
ex.printStackTrace(System.out);
return Map.of("error", "Order creation failure", "detail", ex.toString());
}
return Map.of("workflowId", workflowId);
}
}

View File

@ -0,0 +1,130 @@
package io.orkes.example.saga.workers;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.sdk.workflow.task.WorkerTask;
import io.orkes.example.saga.pojos.*;
import io.orkes.example.saga.service.InventoryService;
import io.orkes.example.saga.service.OrderService;
import io.orkes.example.saga.service.PaymentService;
import io.orkes.example.saga.service.ShipmentService;
import lombok.AllArgsConstructor;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@AllArgsConstructor
@Component
@ComponentScan(basePackages = {"io.orkes"})
public class ConductorWorkers {
/**
* Note: Using this setting, up to 5 tasks will run in parallel, with tasks being polled every 200ms
*/
@WorkerTask(value = "order_food", threadCount = 3, pollingInterval = 300)
public TaskResult orderFoodTask(OrderRequest orderRequest) {
String orderId = OrderService.createOrder(orderRequest);
TaskResult result = new TaskResult();
Map<String, Object> output = new HashMap<>();
if(orderId != null) {
output.put("orderId", orderId);
result.setOutputData(output);
result.setStatus(TaskResult.Status.COMPLETED);
} else {
output.put("orderId", null);
result.setStatus(TaskResult.Status.FAILED);
}
return result;
}
@WorkerTask(value = "check_inventory", threadCount = 2, pollingInterval = 300)
public TaskResult checkInventoryTask(CheckInventoryRequest checkInventoryRequest) {
int restaurantId = checkInventoryRequest.getRestaurantId();
ArrayList<FoodItem> items = checkInventoryRequest.getItems();
boolean availability = InventoryService.checkAvailability(restaurantId, items);
TaskResult result = new TaskResult();
if (availability) {
result.setStatus(TaskResult.Status.COMPLETED);
} else {
result.setReasonForIncompletion("Restaurant is closed");
result.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
}
return result;
}
@WorkerTask(value = "make_payment", threadCount = 2, pollingInterval = 300)
public TaskResult makePaymentTask(PaymentRequest paymentRequest) {
TaskResult result = new TaskResult();
Payment payment = PaymentService.createPayment(paymentRequest);
Map<String, Object> output = new HashMap<>();
output.put("orderId", payment.getOrderId());
output.put("paymentId", payment.getPaymentId());
output.put("paymentStatus", payment.getStatus().name());
if(payment.getStatus() == Payment.Status.SUCCESSFUL) {
result.setStatus(TaskResult.Status.COMPLETED);
} else {
output.put("error", payment.getErrorMsg());
result.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
}
result.setOutputData(output);
return result;
}
@WorkerTask(value = "ship_food", threadCount = 2, pollingInterval = 300)
public TaskResult shipFoodTask(ShippingRequest shippingRequest) {
TaskResult result = new TaskResult();
Map<String, Object> output = new HashMap<>();
int driverId = ShipmentService.createShipment(shippingRequest);
if (driverId != 0) {
result.setStatus(TaskResult.Status.COMPLETED);
} else {
result.setStatus(TaskResult.Status.FAILED);
}
return result;
}
@WorkerTask(value = "notify_driver", threadCount = 2, pollingInterval = 300)
public Map<String, Object> checkForDriverNotifications(DriverNotificationRequest driverNotificationRequest) {
Map<String, Object> result = new HashMap<>();
return result;
}
@WorkerTask(value = "notify_customer", threadCount = 2, pollingInterval = 300)
public Map<String, Object> checkForCustomerNotifications(Order order) {
Map<String, Object> result = new HashMap<>();
return result;
}
//
@WorkerTask(value = "cancel_payment", threadCount = 2, pollingInterval = 300)
public Map<String, Object> cancelPaymentTask(CancelRequest cancelRequest) {
Map<String, Object> result = new HashMap<>();
PaymentService.cancelPayment(cancelRequest.getOrderId());
return result;
}
@WorkerTask(value = "cancel_delivery", threadCount = 2, pollingInterval = 300)
public Map<String, Object> cancelDeliveryTask(CancelRequest cancelRequest) {
Map<String, Object> result = new HashMap<>();
ShipmentService.cancelDelivery(cancelRequest.getOrderId());
return result;
}
@WorkerTask(value = "cancel_order", threadCount = 2, pollingInterval = 300)
public Map<String, Object> cancelOrderTask(CancelRequest cancelRequest) {
Map<String, Object> result = new HashMap<>();
Order order = OrderService.getOrder(cancelRequest.getOrderId());
OrderService.cancelOrder(order);
return result;
}
}

View File

@ -0,0 +1,32 @@
spring.mvc.pathmatch.matching-strategy=ant_path_matcher
springdoc.swagger-ui.path=/swagger-ui.html
management.endpoints.enabled-by-default=false
management.endpoint.info.enabled=false
# Local app server port
server.port=8081
# Playground
# Obtain key and secret by logging into https://play.orkes.io/
# and navigating to applications menu, create an application and generate key/secret
conductor.server.url=https://play.orkes.io/api
conductor.security.client.key-id=<key>
conductor.security.client.secret=<secret>
# Task Domain
conductor.worker.all.domain=saga
conductor.worker.all.pollingInterval=22
# DB Setup
spring.jpa.database-platform=com.springboot.sqlite.SQLDialect
spring.jpa.hibernate.ddl-auto=update
spring.datasource.url=jdbc:sqlite:food_delivery.db
spring.datasource.driver-class-name = org.sqlite.JDBC
spring.datasource.username=admin
spring.datasource.password=admin