git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@649192 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-04-17 17:20:34 +00:00
parent 18aa1596a2
commit 6707ac60cc
6 changed files with 744 additions and 0 deletions

View File

@ -0,0 +1,40 @@
Transactions Demo
=================
This example is an ActiveMQ implementation of the "TransactedExample" from
Sun's JMS Tutorial (http://java.sun.com/products/jms/tutorial/index.html).
The example simulates a simplified eCommerce application with four parts:
the retailer who places the orders, the vendor who assemples the computers,
and two suppliers--one for hard drives and another for monitors.
The retailer sends a message to the vendor's queue and awaits a reply.
The vendor receives the message and sends a message to each of the
supplier's queues. It does this in a single transaction, and will randomly
throw an exception simulating a database error, triggering a rollback.
Each supplier receives the order, checks inventory and replies to the
message stating how many items were sent.
The vendor collects both responses and responds to the retailer, notifying
wheather it cna fulfill the complete order or not.
The retailer receives the message from the vendor.
Running the Example
===================
To run the complete demo in a single JVM, with ActiveMQ running on the local
computer:
ant transactions_demo
If you are running ActiveMQ on a non-standard port, or on a different host,
you can pass a url on the commandline:
ant -Durl=tcp://localhost:61616 transactions_demo
If your ActiveMQ instance is password-protected, you can also pass a
username and password on the command line:
ant -Duser=myusername -Dpassword=supersecret transactions_demo
You can also run the individual components seperately, again with optional
url and/or authentication parameters:
ant retailer &
ant vendor &
ant hdsupplier &
ant monitorsupplier &

View File

@ -0,0 +1,113 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project name="transactions" default="help" basedir=".">
<property name="class.dir" value="target/classes" />
<property name="activemq.home" value="../.." />
<!-- example program defaults -->
<property name="url" value="tcp://localhost:61616" />
<target name="help">
<echo>
For the full demo:
ant transactions_demo -Durl=tcp://hostname:1234
For the indicual components:
ant retailer -Durl=tcp://hostname:1234
ant vendor -Durl=tcp://hostname:1234
ant hdsupplier -Durl=tcp://hostname:1234
ant monitorsupplier -Durl=tcp://hostname:1234
</echo>
</target>
<target name="clean">
<delete dir="target" quiet="true" />
<delete dir="${class.dir}" quiet="true" />
</target>
<target name="init">
<mkdir dir="${class.dir}" />
<path id="javac.classpath">
<pathelement path="${class.dir}" />
<pathelement path="../conf" />
<fileset dir="${activemq.home}/lib">
<include name="**/*.jar" />
</fileset>
</path>
</target>
<target name="compile" depends="init" description="Compile all Java">
<javac srcdir="src" destdir="${class.dir}" debug="true">
<classpath refid="javac.classpath" />
</javac>
</target>
<target name="transactions_demo" depends="compile" description="Runs the full demo">
<java classname="TransactionsDemo" fork="yes">
<classpath refid="javac.classpath" />
<jvmarg value="-server" />
<sysproperty key="activemq.home" value="${activemq.home}"/>
<arg value="${url}" />
<arg value="${user}" />
<arg value="${password}" />
</java>
</target>
<target name="retailer" depends="compile" description="Runs the retailer">
<java classname="Retailer" fork="yes">
<classpath refid="javac.classpath" />
<jvmarg value="-server" />
<sysproperty key="activemq.home" value="${activemq.home}"/>
<arg value="${url}" />
<arg value="${user}" />
<arg value="${password}" />
</java>
</target>
<target name="vendor" depends="compile" description="Runs the vendor">
<java classname="Vendor" fork="yes">
<classpath refid="javac.classpath" />
<jvmarg value="-server" />
<sysproperty key="activemq.home" value="${activemq.home}"/>
<arg value="${url}" />
<arg value="${user}" />
<arg value="${password}" />
</java>
</target>
<target name="hdsupplier" depends="compile" description="Runs the Hard Drive Supplier">
<java classname="Supplier" fork="yes">
<classpath refid="javac.classpath" />
<jvmarg value="-server" />
<sysproperty key="activemq.home" value="${activemq.home}"/>
<arg value="HardDrive" />
<arg value="${url}" />
<arg value="${user}" />
<arg value="${password}" />
</java>
</target>
<target name="monitorsupplier" depends="compile" description="Runs the Monitor Supplier">
<java classname="Supplier" fork="yes">
<classpath refid="javac.classpath" />
<jvmarg value="-server" />
<sysproperty key="activemq.home" value="${activemq.home}"/>
<arg value="Monitor" />
<arg value="${url}" />
<arg value="${user}" />
<arg value="${password}" />
</java>
</target>
</project>

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
/**
* The Retailer orders computers from the Vendor by sending a message via
* the VendorOrderQueue. It then syncronously receives the reponse message
* and reports if the order was successful or not.
*/
public class Retailer implements Runnable {
private String url;
private String user;
private String password;
public Retailer(String url, String user, String password) {
this.url = url;
this.user = user;
this.password = password;
}
public void run() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
try {
Connection connection = connectionFactory.createConnection();
// The Retailer's session is non-trasacted.
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination vendorOrderQueue = session.createQueue("VendorOrderQueue");
TemporaryQueue retailerConfirmQueue = session.createTemporaryQueue();
MessageProducer producer = session.createProducer(vendorOrderQueue);
MessageConsumer replyConsumer = session.createConsumer(retailerConfirmQueue);
connection.start();
for (int i = 0; i < 5; i++) {
MapMessage message = session.createMapMessage();
message.setString("Item", "Computer(s)");
int quantity = (int)(Math.random() * 4) + 1;
message.setInt("Quantity", quantity);
message.setJMSReplyTo(retailerConfirmQueue);
producer.send(message);
System.out.println("Retailer: Ordered " + quantity + " computers.");
MapMessage reply = (MapMessage) replyConsumer.receive();
if (reply.getBoolean("OrderAccepted")) {
System.out.println("Retailer: Order Filled");
} else {
System.out.println("Retailer: Order Not Filled");
}
}
// Send a non-MapMessage to signal the end
producer.send(session.createMessage());
replyConsumer.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
String url = "tcp://localhost:61616";
String user = null;
String password = null;
if (args.length >= 1) {
url = args[0];
}
if (args.length >= 2) {
user = args[1];
}
if (args.length >= 3) {
password = args[2];
}
Retailer r = new Retailer(url, user, password);
new Thread(r, "Retailer").start();
}
}

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Random;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* The Supplier synchronously receives the order from the Vendor and
* randomly responds with either the number ordered, or some lower
* quantity.
*/
public class Supplier implements Runnable {
private String url;
private String user;
private String password;
private final String ITEM;
private final String QUEUE;
public Supplier(String item, String queue, String url, String user, String password) {
this.url = url;
this.user = user;
this.password = password;
this.ITEM = item;
this.QUEUE = queue;
}
public void run() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
Session session = null;
Destination orderQueue;
try {
Connection connection = connectionFactory.createConnection();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
orderQueue = session.createQueue(QUEUE);
MessageConsumer consumer = session.createConsumer(orderQueue);
connection.start();
while (true) {
Message message = consumer.receive();
MessageProducer producer = session.createProducer(message.getJMSReplyTo());
MapMessage orderMessage;
if (message instanceof MapMessage) {
orderMessage = (MapMessage) message;
} else {
// End of Stream
producer.send(session.createMessage());
session.commit();
producer.close();
break;
}
int quantity = orderMessage.getInt("Quantity");
System.out.println(ITEM + " Supplier: Vendor ordered " + quantity + " " + orderMessage.getString("Item"));
MapMessage outMessage = session.createMapMessage();
outMessage.setInt("VendorOrderNumber", orderMessage.getInt("VendorOrderNumber"));
outMessage.setString("Item", ITEM);
quantity = Math.min(
orderMessage.getInt("Quantity"),
new Random().nextInt(orderMessage.getInt("Quantity") * 10));
outMessage.setInt("Quantity", quantity);
producer.send(outMessage);
System.out.println(ITEM + " Supplier: Sent " + quantity + " " + ITEM + "(s)");
session.commit();
System.out.println(ITEM + " Supplier: committed transaction");
producer.close();
}
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
String url = "tcp://localhost:61616";
String user = null;
String password = null;
String item = "HardDrive";
if (args.length >= 1) {
item = args[0];
}
String queue;
if ("HardDrive".equals(item)) {
queue = "StorageOrderQueue";
} else if ("Monitor".equals(item)) {
queue = "MonitorOrderQueue";
} else {
throw new IllegalArgumentException("Item must be either HardDrive or Monitor");
}
if (args.length >= 2) {
url = args[1];
}
if (args.length >= 3) {
user = args[2];
}
if (args.length >= 4) {
password = args[3];
}
Supplier s = new Supplier(item, queue, url, user, password);
new Thread(s, "Supplier " + item).start();
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class TransactionsDemo {
public static void main(String[] args) {
String url = "tcp://localhost:61616";
String user = null;
String password = null;
if (args.length >= 1) {
url = args[0];
}
if (args.length >= 2) {
user = args[1];
}
if (args.length >= 3) {
password = args[2];
}
Retailer r = new Retailer(url, user, password);
Vendor v = new Vendor(url, user, password);
Supplier s1 = new Supplier("HardDrive", "StorageOrderQueue", url, user, password);
Supplier s2 = new Supplier("Monitor", "MonitorOrderQueue", url, user, password);
new Thread(r, "Retailer").start();
new Thread(v, "Vendor").start();
new Thread(s1, "Supplier 1").start();
new Thread(s2, "Supplier 2").start();
}
}

View File

@ -0,0 +1,302 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* The Vendor synchronously, and in a single transaction, receives the
* order from VendorOrderQueue and sends messages to the two Suppliers via
* MonitorOrderQueue and StorageOrderQueue.
* The responses are received asynchronously; when both responses come
* back, the order confirmation message is sent back to the Retailer.
*/
public class Vendor implements Runnable, MessageListener {
private String url;
private String user;
private String password;
private Session asyncSession;
private int numSuppliers = 2;
private Object supplierLock = new Object();
public Vendor(String url, String user, String password) {
this.url = url;
this.user = user;
this.password = password;
}
public void run() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
Session session = null;
Destination orderQueue;
Destination monitorOrderQueue;
Destination storageOrderQueue;
TemporaryQueue vendorConfirmQueue;
MessageConsumer orderConsumer = null;
MessageProducer monitorProducer = null;
MessageProducer storageProducer = null;
try {
Connection connection = connectionFactory.createConnection();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
orderQueue = session.createQueue("VendorOrderQueue");
monitorOrderQueue = session.createQueue("MonitorOrderQueue");
storageOrderQueue = session.createQueue("StorageOrderQueue");
orderConsumer = session.createConsumer(orderQueue);
monitorProducer = session.createProducer(monitorOrderQueue);
storageProducer = session.createProducer(storageOrderQueue);
Connection asyncconnection = connectionFactory.createConnection();
asyncSession = asyncconnection.createSession(true, Session.SESSION_TRANSACTED);
vendorConfirmQueue = asyncSession.createTemporaryQueue();
MessageConsumer confirmConsumer = asyncSession.createConsumer(vendorConfirmQueue);
confirmConsumer.setMessageListener(this);
asyncconnection.start();
connection.start();
while (true) {
Order order = null;
try {
Message inMessage = orderConsumer.receive();
MapMessage message;
if (inMessage instanceof MapMessage) {
message = (MapMessage) inMessage;
} else {
// end of stream
Message outMessage = session.createMessage();
outMessage.setJMSReplyTo(vendorConfirmQueue);
monitorProducer.send(outMessage);
storageProducer.send(outMessage);
session.commit();
break;
}
// Randomly throw an exception in here to simulate a Database error
// and trigger a rollback of the transaction
if (new Random().nextInt(3) == 0) {
throw new JMSException("Simulated Database Error.");
}
order = new Order(message);
MapMessage orderMessage = session.createMapMessage();
orderMessage.setJMSReplyTo(vendorConfirmQueue);
orderMessage.setInt("VendorOrderNumber", order.getOrderNumber());
int quantity = message.getInt("Quantity");
System.out.println("Vendor: Retailer ordered " + quantity + " " + message.getString("Item"));
orderMessage.setInt("Quantity", quantity);
orderMessage.setString("Item", "Monitor");
monitorProducer.send(orderMessage);
System.out.println("Vendor: ordered " + quantity + " Monitor(s)");
orderMessage.setString("Item", "HardDrive");
storageProducer.send(orderMessage);
System.out.println("Vendor: ordered " + quantity + " Hard Drive(s)");
session.commit();
System.out.println("Vendor: Comitted Transaction 1");
} catch (JMSException e) {
System.out.println("Vendor: JMSException Occured: " + e.getMessage());
e.printStackTrace();
session.rollback();
System.out.println("Vendor: Rolled Back Transaction.");
}
}
synchronized (supplierLock) {
while (numSuppliers > 0) {
try {
supplierLock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
connection.close();
asyncconnection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
public void onMessage(Message message) {
if (!(message instanceof MapMessage)) {
synchronized(supplierLock) {
numSuppliers--;
supplierLock.notifyAll();
}
try {
asyncSession.commit();
return;
} catch (JMSException e) {
e.printStackTrace();
}
}
int orderNumber = -1;
try {
MapMessage componentMessage = (MapMessage) message;
orderNumber = componentMessage.getInt("VendorOrderNumber");
Order order = Order.getOrder(orderNumber);
order.processSubOrder(componentMessage);
asyncSession.commit();
if (! "Pending".equals(order.getStatus())) {
System.out.println("Vendor: Completed processing for order " + orderNumber);
MessageProducer replyProducer = asyncSession.createProducer(order.getMessage().getJMSReplyTo());
MapMessage replyMessage = asyncSession.createMapMessage();
if ("Fulfilled".equals(order.getStatus())) {
replyMessage.setBoolean("OrderAccepted", true);
System.out.println("Vendor: sent " + order.quantity + " computer(s)");
} else {
replyMessage.setBoolean("OrderAccepted", false);
System.out.println("Vendor: unable to send " + order.quantity + " computer(s)");
}
replyProducer.send(replyMessage);
asyncSession.commit();
System.out.println("Vender: committed transaction 2");
}
} catch (JMSException e) {
e.printStackTrace();
}
}
public static class Order {
private static Map<Integer, Order> pendingOrders = new HashMap<Integer, Order>();
private static int nextOrderNumber = 1;
private int orderNumber;
private int quantity;
private MapMessage monitor = null;
private MapMessage storage = null;
private MapMessage message;
private String status;
public Order(MapMessage message) {
this.orderNumber = nextOrderNumber++;
this.message = message;
try {
this.quantity = message.getInt("Quantity");
} catch (JMSException e) {
e.printStackTrace();
this.quantity = 0;
}
status = "Pending";
pendingOrders.put(orderNumber, this);
}
public Object getStatus() {
return status;
}
public int getOrderNumber() {
return orderNumber;
}
public static int getOutstandingOrders() {
return pendingOrders.size();
}
public static Order getOrder(int number) {
return pendingOrders.get(number);
}
public MapMessage getMessage() {
return message;
}
public void processSubOrder(MapMessage message) {
String itemName = null;
try {
itemName = message.getString("Item");
} catch (JMSException e) {
e.printStackTrace();
}
if ("Monitor".equals(itemName)) {
monitor = message;
} else if ("HardDrive".equals(itemName)) {
storage = message;
}
if (null != monitor && null != storage) {
// Received both messages
try {
if (quantity > monitor.getInt("Quantity")) {
status = "Cancelled";
} else if (quantity > storage.getInt("Quantity")) {
status = "Cancelled";
} else {
status = "Fulfilled";
}
} catch (JMSException e) {
e.printStackTrace();
status = "Cancelled";
}
}
}
}
public static void main(String[] args) {
String url = "tcp://localhost:61616";
String user = null;
String password = null;
if (args.length >= 1) {
url = args[0];
}
if (args.length >= 2) {
user = args[1];
}
if (args.length >= 3) {
password = args[2];
}
Vendor v = new Vendor(url, user, password);
new Thread(v, "Vendor").start();
}
}