Hiram R. Chirino 2006-07-07 00:54:47 +00:00
parent 27b58d6f61
commit 251b25de92
3 changed files with 217 additions and 0 deletions

View File

@ -166,6 +166,25 @@
</java> </java>
</target> </target>
<target name="requester" depends="compile" description="Runs a simple requester">
<echo>Running requester against server at $$url = ${url} for subject $$subject = ${subject}</echo>
<java classname="RequesterTool" fork="yes" maxmemory="100M">
<classpath refid="javac.classpath" />
<jvmarg value="-server" />
<arg value="${url}" />
<arg value="${topic}" />
<arg value="${subject}" />
<arg value="${durable}" />
<arg value="${max}" />
<arg value="${messageSize}" />
<arg value="${producerClientId}" />
<arg value="${timeToLive}" />
<arg value="${sleepTime}" />
<arg value="${transacted}" />
</java>
</target>
<target name="embedBroker" depends="compile" description="Runs a simple producer"> <target name="embedBroker" depends="compile" description="Runs a simple producer">
<echo>Running an embedded broker example</echo> <echo>Running an embedded broker example</echo>

View File

@ -16,11 +16,13 @@
*/ */
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener; import javax.jms.ExceptionListener;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
@ -41,8 +43,10 @@ public class ConsumerTool extends ToolSupport implements MessageListener, Except
private boolean pauseBeforeShutdown; private boolean pauseBeforeShutdown;
private boolean running; private boolean running;
private Session session; private Session session;
private long sleepTime=0; private long sleepTime=0;
private long receiveTimeOut=0; private long receiveTimeOut=0;
private MessageProducer replyProducer;
public static void main(String[] args) { public static void main(String[] args) {
ConsumerTool tool = new ConsumerTool(); ConsumerTool tool = new ConsumerTool();
@ -88,6 +92,10 @@ public class ConsumerTool extends ToolSupport implements MessageListener, Except
Connection connection = createConnection(); Connection connection = createConnection();
connection.setExceptionListener(this); connection.setExceptionListener(this);
session = createSession(connection); session = createSession(connection);
replyProducer = session.createProducer(null);
replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageConsumer consumer = null; MessageConsumer consumer = null;
if (durable && topic) { if (durable && topic) {
consumer = session.createDurableSubscriber((Topic) destination, consumerName); consumer = session.createDurableSubscriber((Topic) destination, consumerName);
@ -133,6 +141,14 @@ public class ConsumerTool extends ToolSupport implements MessageListener, Except
if(transacted) { if(transacted) {
session.commit(); session.commit();
} }
if ( message.getJMSReplyTo() !=null ) {
replyProducer.send(message.getJMSReplyTo(), session.createTextMessage("Reply: "+message.getJMSMessageID()));
if(transacted) {
session.commit();
}
}
/* /*
if (++count % dumpCount == 0) { if (++count % dumpCount == 0) {
dumpStats(connection); dumpStats(connection);

View File

@ -0,0 +1,182 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Date;
/**
* A simple tool for publishing messages
*
* @version $Revision: 1.2 $
*/
public class RequesterTool extends ToolSupport {
protected int messageCount = 10;
protected long sleepTime = 0L;
protected boolean verbose = true;
protected int messageSize = 255;
private long timeToLive;
public static void main(String[] args) {
runTool(args, new RequesterTool());
}
protected static void runTool(String[] args, RequesterTool tool) {
tool.clientID = null;
if (args.length > 0) {
tool.url = args[0];
}
if (args.length > 1) {
tool.topic = args[1].equalsIgnoreCase("true");
}
if (args.length > 2) {
tool.subject = args[2];
}
if (args.length > 3) {
tool.durable = args[3].equalsIgnoreCase("true");
}
if (args.length > 4) {
tool.messageCount = Integer.parseInt(args[4]);
}
if (args.length > 5) {
tool.messageSize = Integer.parseInt(args[5]);
}
if (args.length > 6) {
if( ! "null".equals(args[6]) ) {
tool.clientID = args[6];
}
}
if (args.length > 7) {
tool.timeToLive = Long.parseLong(args[7]);
}
if (args.length > 8) {
tool.sleepTime = Long.parseLong(args[8]);
}
if (args.length > 9) {
tool.transacted = "true".equals(args[9]);
}
tool.run();
}
public void run() {
try {
System.out.println("Connecting to URL: " + url);
System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
System.out.println("Using " + (durable ? "durable" : "non-durable") + " publishing");
System.out.println("Sleeping between publish "+sleepTime+" ms");
if( timeToLive!=0 ) {
System.out.println("Messages time to live "+timeToLive+" ms");
}
Connection connection = createConnection();
Session session = createSession(connection);
MessageProducer producer = createProducer(session);
Destination replyDest = null;
if( this.topic ) {
replyDest = session.createTemporaryTopic();
} else {
replyDest = session.createTemporaryQueue();
}
System.out.println("Reply Destination: "+replyDest);
MessageConsumer consumer = session.createConsumer(replyDest);
requestLoop(session, producer, consumer, replyDest);
System.out.println("Done.");
close(connection, session);
}
catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
protected MessageProducer createProducer(Session session) throws JMSException {
MessageProducer producer = session.createProducer(destination);
if (durable) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
}
else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
if( timeToLive!=0 )
producer.setTimeToLive(timeToLive);
return producer;
}
protected void requestLoop(Session session, MessageProducer producer, MessageConsumer consumer, Destination replyDest) throws Exception {
for (int i = 0; i < messageCount || messageCount==0 ; i++) {
TextMessage message = session.createTextMessage(createMessageText(i));
message.setJMSReplyTo(replyDest);
if (verbose) {
String msg = message.getText();
if (msg.length() > 50) {
msg = msg.substring(0, 50) + "...";
}
System.out.println("Sending message: " + msg);
}
producer.send(message);
if(transacted) {
session.commit();
}
System.out.println("Waiting for reponse message...");
Message message2 = consumer.receive();
if( message2 instanceof TextMessage ) {
System.out.println("Reponse message: "+((TextMessage)message2).getText());
} else {
System.out.println("Reponse message: "+message2);
}
if(transacted) {
session.commit();
}
Thread.sleep(sleepTime);
}
}
/**
* @param i
* @return
*/
private String createMessageText(int index) {
StringBuffer buffer = new StringBuffer(messageSize);
buffer.append("Message: " + index + " sent at: " + new Date());
if (buffer.length() > messageSize) {
return buffer.substring(0, messageSize);
}
for (int i = buffer.length(); i < messageSize; i++) {
buffer.append(' ');
}
return buffer.toString();
}
}