diff --git a/assembly/src/release/example/build.xml b/assembly/src/release/example/build.xml index e6ea17855e..3305a029b5 100755 --- a/assembly/src/release/example/build.xml +++ b/assembly/src/release/example/build.xml @@ -166,6 +166,25 @@ + + + Running requester against server at $$url = ${url} for subject $$subject = ${subject} + + + + + + + + + + + + + + + + Running an embedded broker example diff --git a/assembly/src/release/example/src/ConsumerTool.java b/assembly/src/release/example/src/ConsumerTool.java index fcf87f9c0a..989bf1d127 100755 --- a/assembly/src/release/example/src/ConsumerTool.java +++ b/assembly/src/release/example/src/ConsumerTool.java @@ -16,11 +16,13 @@ */ import javax.jms.Connection; +import javax.jms.DeliveryMode; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; +import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; @@ -41,8 +43,10 @@ public class ConsumerTool extends ToolSupport implements MessageListener, Except private boolean pauseBeforeShutdown; private boolean running; private Session session; + private long sleepTime=0; private long receiveTimeOut=0; + private MessageProducer replyProducer; public static void main(String[] args) { ConsumerTool tool = new ConsumerTool(); @@ -88,6 +92,10 @@ public class ConsumerTool extends ToolSupport implements MessageListener, Except Connection connection = createConnection(); connection.setExceptionListener(this); session = createSession(connection); + + replyProducer = session.createProducer(null); + replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + MessageConsumer consumer = null; if (durable && topic) { consumer = session.createDurableSubscriber((Topic) destination, consumerName); @@ -133,6 +141,14 @@ public class ConsumerTool extends ToolSupport implements MessageListener, Except if(transacted) { session.commit(); } + + if ( message.getJMSReplyTo() !=null ) { + replyProducer.send(message.getJMSReplyTo(), session.createTextMessage("Reply: "+message.getJMSMessageID())); + if(transacted) { + session.commit(); + } + } + /* if (++count % dumpCount == 0) { dumpStats(connection); diff --git a/assembly/src/release/example/src/RequesterTool.java b/assembly/src/release/example/src/RequesterTool.java new file mode 100644 index 0000000000..183130e882 --- /dev/null +++ b/assembly/src/release/example/src/RequesterTool.java @@ -0,0 +1,182 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.Date; + +/** + * A simple tool for publishing messages + * + * @version $Revision: 1.2 $ + */ +public class RequesterTool extends ToolSupport { + + protected int messageCount = 10; + protected long sleepTime = 0L; + protected boolean verbose = true; + protected int messageSize = 255; + private long timeToLive; + + public static void main(String[] args) { + runTool(args, new RequesterTool()); + } + + protected static void runTool(String[] args, RequesterTool tool) { + tool.clientID = null; + if (args.length > 0) { + tool.url = args[0]; + } + if (args.length > 1) { + tool.topic = args[1].equalsIgnoreCase("true"); + } + if (args.length > 2) { + tool.subject = args[2]; + } + if (args.length > 3) { + tool.durable = args[3].equalsIgnoreCase("true"); + } + if (args.length > 4) { + tool.messageCount = Integer.parseInt(args[4]); + } + if (args.length > 5) { + tool.messageSize = Integer.parseInt(args[5]); + } + if (args.length > 6) { + if( ! "null".equals(args[6]) ) { + tool.clientID = args[6]; + } + } + if (args.length > 7) { + tool.timeToLive = Long.parseLong(args[7]); + } + if (args.length > 8) { + tool.sleepTime = Long.parseLong(args[8]); + } + if (args.length > 9) { + tool.transacted = "true".equals(args[9]); + } + tool.run(); + } + + public void run() { + try { + System.out.println("Connecting to URL: " + url); + System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject); + System.out.println("Using " + (durable ? "durable" : "non-durable") + " publishing"); + System.out.println("Sleeping between publish "+sleepTime+" ms"); + if( timeToLive!=0 ) { + System.out.println("Messages time to live "+timeToLive+" ms"); + } + Connection connection = createConnection(); + Session session = createSession(connection); + MessageProducer producer = createProducer(session); + + Destination replyDest = null; + if( this.topic ) { + replyDest = session.createTemporaryTopic(); + } else { + replyDest = session.createTemporaryQueue(); + } + + System.out.println("Reply Destination: "+replyDest); + MessageConsumer consumer = session.createConsumer(replyDest); + + requestLoop(session, producer, consumer, replyDest); + + System.out.println("Done."); + close(connection, session); + } + catch (Exception e) { + System.out.println("Caught: " + e); + e.printStackTrace(); + } + } + + protected MessageProducer createProducer(Session session) throws JMSException { + MessageProducer producer = session.createProducer(destination); + if (durable) { + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + } + else { + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + } + if( timeToLive!=0 ) + producer.setTimeToLive(timeToLive); + return producer; + } + + protected void requestLoop(Session session, MessageProducer producer, MessageConsumer consumer, Destination replyDest) throws Exception { + + for (int i = 0; i < messageCount || messageCount==0 ; i++) { + + + TextMessage message = session.createTextMessage(createMessageText(i)); + message.setJMSReplyTo(replyDest); + + if (verbose) { + String msg = message.getText(); + if (msg.length() > 50) { + msg = msg.substring(0, 50) + "..."; + } + System.out.println("Sending message: " + msg); + } + + producer.send(message); + if(transacted) { + session.commit(); + } + + System.out.println("Waiting for reponse message..."); + Message message2 = consumer.receive(); + if( message2 instanceof TextMessage ) { + System.out.println("Reponse message: "+((TextMessage)message2).getText()); + } else { + System.out.println("Reponse message: "+message2); + } + if(transacted) { + session.commit(); + } + + Thread.sleep(sleepTime); + + } + + } + + /** + * @param i + * @return + */ + private String createMessageText(int index) { + StringBuffer buffer = new StringBuffer(messageSize); + buffer.append("Message: " + index + " sent at: " + new Date()); + if (buffer.length() > messageSize) { + return buffer.substring(0, messageSize); + } + for (int i = buffer.length(); i < messageSize; i++) { + buffer.append(' '); + } + return buffer.toString(); + } +}