diff --git a/assembly/src/release/example/transactions/README.txt b/assembly/src/release/example/transactions/README.txt new file mode 100644 index 0000000000..2c3238aa49 --- /dev/null +++ b/assembly/src/release/example/transactions/README.txt @@ -0,0 +1,40 @@ +Transactions Demo +================= +This example is an ActiveMQ implementation of the "TransactedExample" from +Sun's JMS Tutorial (http://java.sun.com/products/jms/tutorial/index.html). + +The example simulates a simplified eCommerce application with four parts: +the retailer who places the orders, the vendor who assemples the computers, +and two suppliers--one for hard drives and another for monitors. + +The retailer sends a message to the vendor's queue and awaits a reply. +The vendor receives the message and sends a message to each of the +supplier's queues. It does this in a single transaction, and will randomly +throw an exception simulating a database error, triggering a rollback. +Each supplier receives the order, checks inventory and replies to the +message stating how many items were sent. +The vendor collects both responses and responds to the retailer, notifying +wheather it cna fulfill the complete order or not. +The retailer receives the message from the vendor. + +Running the Example +=================== +To run the complete demo in a single JVM, with ActiveMQ running on the local +computer: + ant transactions_demo + +If you are running ActiveMQ on a non-standard port, or on a different host, +you can pass a url on the commandline: + ant -Durl=tcp://localhost:61616 transactions_demo + +If your ActiveMQ instance is password-protected, you can also pass a +username and password on the command line: + ant -Duser=myusername -Dpassword=supersecret transactions_demo + +You can also run the individual components seperately, again with optional +url and/or authentication parameters: + ant retailer & + ant vendor & + ant hdsupplier & + ant monitorsupplier & + diff --git a/assembly/src/release/example/transactions/build.xml b/assembly/src/release/example/transactions/build.xml new file mode 100644 index 0000000000..839cde2940 --- /dev/null +++ b/assembly/src/release/example/transactions/build.xml @@ -0,0 +1,113 @@ + + + + + + + + + + + + For the full demo: + ant transactions_demo -Durl=tcp://hostname:1234 + For the indicual components: + ant retailer -Durl=tcp://hostname:1234 + ant vendor -Durl=tcp://hostname:1234 + ant hdsupplier -Durl=tcp://hostname:1234 + ant monitorsupplier -Durl=tcp://hostname:1234 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/assembly/src/release/example/transactions/src/Retailer.java b/assembly/src/release/example/transactions/src/Retailer.java new file mode 100644 index 0000000000..01ecfef283 --- /dev/null +++ b/assembly/src/release/example/transactions/src/Retailer.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import org.apache.activemq.ActiveMQConnectionFactory; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; + +/** + * The Retailer orders computers from the Vendor by sending a message via + * the VendorOrderQueue. It then syncronously receives the reponse message + * and reports if the order was successful or not. + */ +public class Retailer implements Runnable { + private String url; + private String user; + private String password; + + public Retailer(String url, String user, String password) { + this.url = url; + this.user = user; + this.password = password; + } + + public void run() { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); + try { + Connection connection = connectionFactory.createConnection(); + + // The Retailer's session is non-trasacted. + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination vendorOrderQueue = session.createQueue("VendorOrderQueue"); + TemporaryQueue retailerConfirmQueue = session.createTemporaryQueue(); + + MessageProducer producer = session.createProducer(vendorOrderQueue); + MessageConsumer replyConsumer = session.createConsumer(retailerConfirmQueue); + + connection.start(); + + for (int i = 0; i < 5; i++) { + MapMessage message = session.createMapMessage(); + message.setString("Item", "Computer(s)"); + int quantity = (int)(Math.random() * 4) + 1; + message.setInt("Quantity", quantity); + message.setJMSReplyTo(retailerConfirmQueue); + producer.send(message); + System.out.println("Retailer: Ordered " + quantity + " computers."); + + MapMessage reply = (MapMessage) replyConsumer.receive(); + if (reply.getBoolean("OrderAccepted")) { + System.out.println("Retailer: Order Filled"); + } else { + System.out.println("Retailer: Order Not Filled"); + } + } + + // Send a non-MapMessage to signal the end + producer.send(session.createMessage()); + + replyConsumer.close(); + connection.close(); + + } catch (JMSException e) { + e.printStackTrace(); + } + } + + public static void main(String[] args) { + String url = "tcp://localhost:61616"; + String user = null; + String password = null; + + if (args.length >= 1) { + url = args[0]; + } + + if (args.length >= 2) { + user = args[1]; + } + + if (args.length >= 3) { + password = args[2]; + } + + Retailer r = new Retailer(url, user, password); + + new Thread(r, "Retailer").start(); + } +} diff --git a/assembly/src/release/example/transactions/src/Supplier.java b/assembly/src/release/example/transactions/src/Supplier.java new file mode 100644 index 0000000000..ed02cc2ac1 --- /dev/null +++ b/assembly/src/release/example/transactions/src/Supplier.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.util.Random; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; + +/** + * The Supplier synchronously receives the order from the Vendor and + * randomly responds with either the number ordered, or some lower + * quantity. + */ +public class Supplier implements Runnable { + private String url; + private String user; + private String password; + private final String ITEM; + private final String QUEUE; + + public Supplier(String item, String queue, String url, String user, String password) { + this.url = url; + this.user = user; + this.password = password; + this.ITEM = item; + this.QUEUE = queue; + } + + public void run() { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); + Session session = null; + Destination orderQueue; + try { + Connection connection = connectionFactory.createConnection(); + + session = connection.createSession(true, Session.SESSION_TRANSACTED); + orderQueue = session.createQueue(QUEUE); + MessageConsumer consumer = session.createConsumer(orderQueue); + + connection.start(); + + while (true) { + Message message = consumer.receive(); + MessageProducer producer = session.createProducer(message.getJMSReplyTo()); + MapMessage orderMessage; + if (message instanceof MapMessage) { + orderMessage = (MapMessage) message; + } else { + // End of Stream + producer.send(session.createMessage()); + session.commit(); + producer.close(); + break; + } + + int quantity = orderMessage.getInt("Quantity"); + System.out.println(ITEM + " Supplier: Vendor ordered " + quantity + " " + orderMessage.getString("Item")); + + MapMessage outMessage = session.createMapMessage(); + outMessage.setInt("VendorOrderNumber", orderMessage.getInt("VendorOrderNumber")); + outMessage.setString("Item", ITEM); + + quantity = Math.min( + orderMessage.getInt("Quantity"), + new Random().nextInt(orderMessage.getInt("Quantity") * 10)); + outMessage.setInt("Quantity", quantity); + + producer.send(outMessage); + System.out.println(ITEM + " Supplier: Sent " + quantity + " " + ITEM + "(s)"); + session.commit(); + System.out.println(ITEM + " Supplier: committed transaction"); + producer.close(); + } + connection.close(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + + public static void main(String[] args) { + String url = "tcp://localhost:61616"; + String user = null; + String password = null; + String item = "HardDrive"; + + if (args.length >= 1) { + item = args[0]; + } + String queue; + if ("HardDrive".equals(item)) { + queue = "StorageOrderQueue"; + } else if ("Monitor".equals(item)) { + queue = "MonitorOrderQueue"; + } else { + throw new IllegalArgumentException("Item must be either HardDrive or Monitor"); + } + + if (args.length >= 2) { + url = args[1]; + } + + if (args.length >= 3) { + user = args[2]; + } + + if (args.length >= 4) { + password = args[3]; + } + + Supplier s = new Supplier(item, queue, url, user, password); + + new Thread(s, "Supplier " + item).start(); + } +} diff --git a/assembly/src/release/example/transactions/src/TransactionsDemo.java b/assembly/src/release/example/transactions/src/TransactionsDemo.java new file mode 100644 index 0000000000..4dc1171d4d --- /dev/null +++ b/assembly/src/release/example/transactions/src/TransactionsDemo.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class TransactionsDemo { + + public static void main(String[] args) { + String url = "tcp://localhost:61616"; + String user = null; + String password = null; + + if (args.length >= 1) { + url = args[0]; + } + + if (args.length >= 2) { + user = args[1]; + } + + if (args.length >= 3) { + password = args[2]; + } + + Retailer r = new Retailer(url, user, password); + Vendor v = new Vendor(url, user, password); + Supplier s1 = new Supplier("HardDrive", "StorageOrderQueue", url, user, password); + Supplier s2 = new Supplier("Monitor", "MonitorOrderQueue", url, user, password); + + new Thread(r, "Retailer").start(); + new Thread(v, "Vendor").start(); + new Thread(s1, "Supplier 1").start(); + new Thread(s2, "Supplier 2").start(); + } + +} diff --git a/assembly/src/release/example/transactions/src/Vendor.java b/assembly/src/release/example/transactions/src/Vendor.java new file mode 100644 index 0000000000..0f2ce93ceb --- /dev/null +++ b/assembly/src/release/example/transactions/src/Vendor.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; + +import org.apache.activemq.ActiveMQConnectionFactory; + +/** + * The Vendor synchronously, and in a single transaction, receives the + * order from VendorOrderQueue and sends messages to the two Suppliers via + * MonitorOrderQueue and StorageOrderQueue. + * The responses are received asynchronously; when both responses come + * back, the order confirmation message is sent back to the Retailer. + */ +public class Vendor implements Runnable, MessageListener { + private String url; + private String user; + private String password; + private Session asyncSession; + private int numSuppliers = 2; + private Object supplierLock = new Object(); + + public Vendor(String url, String user, String password) { + this.url = url; + this.user = user; + this.password = password; + } + + public void run() { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); + Session session = null; + Destination orderQueue; + Destination monitorOrderQueue; + Destination storageOrderQueue; + TemporaryQueue vendorConfirmQueue; + MessageConsumer orderConsumer = null; + MessageProducer monitorProducer = null; + MessageProducer storageProducer = null; + + try { + Connection connection = connectionFactory.createConnection(); + + session = connection.createSession(true, Session.SESSION_TRANSACTED); + orderQueue = session.createQueue("VendorOrderQueue"); + monitorOrderQueue = session.createQueue("MonitorOrderQueue"); + storageOrderQueue = session.createQueue("StorageOrderQueue"); + + orderConsumer = session.createConsumer(orderQueue); + monitorProducer = session.createProducer(monitorOrderQueue); + storageProducer = session.createProducer(storageOrderQueue); + + Connection asyncconnection = connectionFactory.createConnection(); + asyncSession = asyncconnection.createSession(true, Session.SESSION_TRANSACTED); + + vendorConfirmQueue = asyncSession.createTemporaryQueue(); + MessageConsumer confirmConsumer = asyncSession.createConsumer(vendorConfirmQueue); + confirmConsumer.setMessageListener(this); + + asyncconnection.start(); + + connection.start(); + + + while (true) { + Order order = null; + try { + Message inMessage = orderConsumer.receive(); + MapMessage message; + if (inMessage instanceof MapMessage) { + message = (MapMessage) inMessage; + + } else { + // end of stream + Message outMessage = session.createMessage(); + outMessage.setJMSReplyTo(vendorConfirmQueue); + monitorProducer.send(outMessage); + storageProducer.send(outMessage); + session.commit(); + break; + } + + // Randomly throw an exception in here to simulate a Database error + // and trigger a rollback of the transaction + if (new Random().nextInt(3) == 0) { + throw new JMSException("Simulated Database Error."); + } + + order = new Order(message); + + MapMessage orderMessage = session.createMapMessage(); + orderMessage.setJMSReplyTo(vendorConfirmQueue); + orderMessage.setInt("VendorOrderNumber", order.getOrderNumber()); + int quantity = message.getInt("Quantity"); + System.out.println("Vendor: Retailer ordered " + quantity + " " + message.getString("Item")); + + orderMessage.setInt("Quantity", quantity); + orderMessage.setString("Item", "Monitor"); + monitorProducer.send(orderMessage); + System.out.println("Vendor: ordered " + quantity + " Monitor(s)"); + + orderMessage.setString("Item", "HardDrive"); + storageProducer.send(orderMessage); + System.out.println("Vendor: ordered " + quantity + " Hard Drive(s)"); + + session.commit(); + System.out.println("Vendor: Comitted Transaction 1"); + + } catch (JMSException e) { + System.out.println("Vendor: JMSException Occured: " + e.getMessage()); + e.printStackTrace(); + session.rollback(); + System.out.println("Vendor: Rolled Back Transaction."); + } + } + + synchronized (supplierLock) { + while (numSuppliers > 0) { + try { + supplierLock.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + connection.close(); + asyncconnection.close(); + + } catch (JMSException e) { + e.printStackTrace(); + } + + } + + public void onMessage(Message message) { + if (!(message instanceof MapMessage)) { + synchronized(supplierLock) { + numSuppliers--; + supplierLock.notifyAll(); + } + try { + asyncSession.commit(); + return; + } catch (JMSException e) { + e.printStackTrace(); + } + } + + int orderNumber = -1; + try { + MapMessage componentMessage = (MapMessage) message; + + orderNumber = componentMessage.getInt("VendorOrderNumber"); + Order order = Order.getOrder(orderNumber); + order.processSubOrder(componentMessage); + asyncSession.commit(); + + if (! "Pending".equals(order.getStatus())) { + System.out.println("Vendor: Completed processing for order " + orderNumber); + + MessageProducer replyProducer = asyncSession.createProducer(order.getMessage().getJMSReplyTo()); + MapMessage replyMessage = asyncSession.createMapMessage(); + if ("Fulfilled".equals(order.getStatus())) { + replyMessage.setBoolean("OrderAccepted", true); + System.out.println("Vendor: sent " + order.quantity + " computer(s)"); + } else { + replyMessage.setBoolean("OrderAccepted", false); + System.out.println("Vendor: unable to send " + order.quantity + " computer(s)"); + } + replyProducer.send(replyMessage); + asyncSession.commit(); + System.out.println("Vender: committed transaction 2"); + } + } catch (JMSException e) { + e.printStackTrace(); + } + } + + public static class Order { + private static Map pendingOrders = new HashMap(); + private static int nextOrderNumber = 1; + + private int orderNumber; + private int quantity; + private MapMessage monitor = null; + private MapMessage storage = null; + private MapMessage message; + private String status; + + public Order(MapMessage message) { + this.orderNumber = nextOrderNumber++; + this.message = message; + try { + this.quantity = message.getInt("Quantity"); + } catch (JMSException e) { + e.printStackTrace(); + this.quantity = 0; + } + status = "Pending"; + pendingOrders.put(orderNumber, this); + } + + public Object getStatus() { + return status; + } + + public int getOrderNumber() { + return orderNumber; + } + + public static int getOutstandingOrders() { + return pendingOrders.size(); + } + + public static Order getOrder(int number) { + return pendingOrders.get(number); + } + + public MapMessage getMessage() { + return message; + } + + public void processSubOrder(MapMessage message) { + String itemName = null; + try { + itemName = message.getString("Item"); + } catch (JMSException e) { + e.printStackTrace(); + } + + if ("Monitor".equals(itemName)) { + monitor = message; + } else if ("HardDrive".equals(itemName)) { + storage = message; + } + + if (null != monitor && null != storage) { + // Received both messages + try { + if (quantity > monitor.getInt("Quantity")) { + status = "Cancelled"; + } else if (quantity > storage.getInt("Quantity")) { + status = "Cancelled"; + } else { + status = "Fulfilled"; + } + } catch (JMSException e) { + e.printStackTrace(); + status = "Cancelled"; + } + } + } + } + + public static void main(String[] args) { + String url = "tcp://localhost:61616"; + String user = null; + String password = null; + + if (args.length >= 1) { + url = args[0]; + } + + if (args.length >= 2) { + user = args[1]; + } + + if (args.length >= 3) { + password = args[2]; + } + + Vendor v = new Vendor(url, user, password); + + new Thread(v, "Vendor").start(); + } +}