Updates to performance module.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@411631 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Adrian T. Co 2006-06-05 01:05:41 +00:00
parent c5ce043ca7
commit 5f5bff613e
17 changed files with 1064 additions and 1137 deletions

View File

@ -1,140 +0,0 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.io.IOException;
/**
* A simple tool for consuming messages
*
* @version $Revision$
*/
public class ConsumerTool extends ToolSupport implements MessageListener {
protected int count = 0;
protected int dumpCount = 10;
protected boolean verbose = true;
protected int maxiumMessages = 0;
private boolean pauseBeforeShutdown;
public static void main(String[] args) {
ConsumerTool tool = new ConsumerTool();
if (args.length > 0) {
tool.url = args[0];
}
if (args.length > 1) {
tool.topic = args[1].equalsIgnoreCase("true");
}
if (args.length > 2) {
tool.subject = args[2];
}
if (args.length > 3) {
tool.durable = args[3].equalsIgnoreCase("true");
}
if (args.length > 4) {
tool.maxiumMessages = Integer.parseInt(args[4]);
}
tool.run();
}
public void run() {
try {
System.out.println("Connecting to URL: " + url);
System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription");
Connection connection = createConnection();
Session session = createSession(connection);
MessageConsumer consumer = null;
if (durable && topic) {
consumer = session.createDurableSubscriber((Topic) destination, consumerName);
}
else {
consumer = session.createConsumer(destination);
}
if (maxiumMessages <= 0) {
consumer.setMessageListener(this);
}
connection.start();
if (maxiumMessages > 0) {
consumeMessagesAndClose(connection, session, consumer);
}
}
catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
if (verbose) {
String msg = txtMsg.getText();
if( msg.length() > 50 )
msg = msg.substring(0, 50)+"...";
System.out.println("Received: " + msg);
}
}
else {
if (verbose) {
System.out.println("Received: " + message);
}
}
/*
if (++count % dumpCount == 0) {
dumpStats(connection);
}
*/
}
catch (JMSException e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException {
System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
for (int i = 0; i < maxiumMessages; i++) {
Message message = consumer.receive();
onMessage(message);
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
if (pauseBeforeShutdown) {
System.out.println("Press return to shut down");
System.in.read();
}
}
}

View File

@ -1,67 +0,0 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.ConnectionFactory;
import java.util.Map;
import java.lang.reflect.Constructor;
public class JmsBasicClientSupport {
private static final Log log = LogFactory.getLog(JmsBasicClientSupport.class);
public static final String DEFAULT_CONNECTION_FACTORY_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
public ConnectionFactory createConnectionFactory(String url) {
return createConnectionFactory(DEFAULT_CONNECTION_FACTORY_CLASS, url, null);
}
public ConnectionFactory createConnectionFactory(String url, Map props) {
return createConnectionFactory(DEFAULT_CONNECTION_FACTORY_CLASS, url, props);
}
public ConnectionFactory createConnectionFactory(String clazz, String url) {
return createConnectionFactory(clazz, url, null);
}
public ConnectionFactory createConnectionFactory(String clazz, String url, Map props) {
if (clazz == null || clazz == "") {
throw new RuntimeException("No class definition specified to create connection factory.");
}
ConnectionFactory f = instantiateConnectionFactory(clazz, url);
if (props != null) {
ReflectionUtil.configureClass(f, props);
}
return f;
}
protected ConnectionFactory instantiateConnectionFactory(String clazz, String url) {
try {
Class factoryClass = Class.forName(clazz);
Constructor c = factoryClass.getConstructor(new Class[] {String.class});
ConnectionFactory factoryObj = (ConnectionFactory)c.newInstance(new Object[] {url});
return factoryObj;
} catch (Exception e) {
throw new RuntimeException (e);
}
}
}

View File

@ -0,0 +1,179 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import java.util.Properties;
public class JmsClientSupport extends JmsFactorySupport {
private static final Log log = LogFactory.getLog(JmsClientSupport.class);
private static final String PREFIX_CONFIG_CLIENT = "client.";
public static final String SESSION_AUTO_ACKNOWLEDGE = "autoAck";
public static final String SESSION_CLIENT_ACKNOWLEDGE = "clientAck";
public static final String SESSION_DUPS_OK_ACKNOWLEDGE = "dupsAck";
public static final String SESSION_TRANSACTED = "transacted";
protected Properties clientSettings = new Properties();
protected Connection jmsConnection;
protected Session jmsSession;
// Client settings
protected String spiClass;
protected boolean sessTransacted = false;
protected String sessAckMode = SESSION_AUTO_ACKNOWLEDGE;
protected String destName = "TEST.FOO";
protected int destCount = 1;
protected boolean destComposite = false;
public ConnectionFactory createConnectionFactory() throws JMSException {
return super.createConnectionFactory(getSpiClass());
}
public Connection getConnection() throws JMSException {
if (jmsConnection == null) {
jmsConnection = createConnectionFactory().createConnection();
}
return jmsConnection;
}
public Session getSession() throws JMSException {
if (jmsSession == null) {
int ackMode;
if (getSessAckMode().equalsIgnoreCase(SESSION_AUTO_ACKNOWLEDGE)) {
ackMode = Session.AUTO_ACKNOWLEDGE;
} else if (getSessAckMode().equalsIgnoreCase(SESSION_CLIENT_ACKNOWLEDGE)) {
ackMode = Session.CLIENT_ACKNOWLEDGE;
} else if (getSessAckMode().equalsIgnoreCase(SESSION_DUPS_OK_ACKNOWLEDGE)) {
ackMode = Session.DUPS_OK_ACKNOWLEDGE;
} else if (getSessAckMode().equalsIgnoreCase(SESSION_TRANSACTED)) {
ackMode = Session.SESSION_TRANSACTED;
} else {
ackMode = Session.AUTO_ACKNOWLEDGE;
}
jmsSession = getConnection().createSession(isSessTransacted(), ackMode);
}
return jmsSession;
}
public Destination[] createDestination() throws JMSException {
Destination[] dest = new Destination[getDestCount()];
for (int i=0; i<getDestCount(); i++) {
dest[i] = createDestination(getDestName() + "." + i);
}
if (isDestComposite()) {
return new Destination[] {createDestination(getDestName() + ".>")};
} else {
return dest;
}
}
public Destination createDestination(String name) throws JMSException {
if (name.startsWith("queue://")) {
return getSession().createQueue(name.substring("queue://".length()));
} else if (name.startsWith("topic://")) {
return getSession().createTopic(name.substring("topic://".length()));
} else {
return getSession().createTopic(name);
}
}
public String getSpiClass() {
return spiClass;
}
public void setSpiClass(String spiClass) {
this.spiClass = spiClass;
}
public boolean isSessTransacted() {
return sessTransacted;
}
public void setSessTransacted(boolean sessTransacted) {
this.sessTransacted = sessTransacted;
}
public String getSessAckMode() {
return sessAckMode;
}
public void setSessAckMode(String sessAckMode) {
this.sessAckMode = sessAckMode;
}
public String getDestName() {
return destName;
}
public void setDestName(String destName) {
this.destName = destName;
}
public int getDestCount() {
return destCount;
}
public void setDestCount(int destCount) {
this.destCount = destCount;
}
public boolean isDestComposite() {
return destComposite;
}
public void setDestComposite(boolean destComposite) {
this.destComposite = destComposite;
}
public Properties getClientSettings() {
return clientSettings;
}
public void setClientSettings(Properties clientSettings) {
this.clientSettings = clientSettings;
ReflectionUtil.configureClass(this, clientSettings);
}
public Properties getSettings() {
Properties allSettings = new Properties(clientSettings);
allSettings.putAll(super.getSettings());
return allSettings;
}
public void setSettings(Properties settings) {
super.setSettings(settings);
ReflectionUtil.configureClass(this, clientSettings);
}
public void setProperty(String key, String value) {
if (key.startsWith(PREFIX_CONFIG_CLIENT)) {
clientSettings.setProperty(key, value);
} else {
super.setProperty(key, value);
}
}
}

View File

@ -1,343 +0,0 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.*;
import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
public class JmsConfigurableClientSupport extends JmsBasicClientSupport {
private static final Log log = LogFactory.getLog(JmsConfigurableClientSupport.class);
public static final String AMQ_SERVER = "amq";
public static final String AMQ_CONNECTION_FACTORY_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
private String serverType = "";
private String factoryClass = "";
private String clientID = "";
private Map factorySettings = new HashMap();
private Map connectionSettings = new HashMap();
private Map sessionSettings = new HashMap();
private Map queueSettings = new HashMap();
private Map topicSettings = new HashMap();
private Map consumerSettings = new HashMap();
private Map producerSettings = new HashMap();
private Map messageSettings = new HashMap();
protected ConnectionFactory jmsFactory = null;
protected Connection jmsConnection = null;
protected Session jmsSession = null;
protected MessageProducer jmsMessageProducer = null;
protected MessageConsumer jmsMessageConsumer = null;
public ConnectionFactory createConnectionFactory(ConnectionFactory factory) {
jmsFactory = factory;
configureJmsObject(jmsFactory, factorySettings);
return jmsFactory;
}
public ConnectionFactory createConnectionFactory(String url) {
jmsFactory = super.createConnectionFactory(factoryClass, url, factorySettings);
return jmsFactory;
}
public ConnectionFactory createConnectionFactory(String url, Map props) {
// Add previous settings to current settings
props.putAll(factorySettings);
jmsFactory = super.createConnectionFactory(factoryClass, url, props);
return jmsFactory;
}
public ConnectionFactory createConnectionFactory(String clazz, String url) {
factoryClass = clazz;
jmsFactory = super.createConnectionFactory(clazz, url, factorySettings);
return jmsFactory;
}
public ConnectionFactory createConnectionFactory(String clazz, String url, Map props) {
// Add previous settings to current settings
factoryClass = clazz;
props.putAll(factorySettings);
jmsFactory = super.createConnectionFactory(clazz, url, props);
return jmsFactory;
}
public ConnectionFactory getConnectionFactory() {
return jmsFactory;
}
public Connection getConnection() throws JMSException {
if (jmsConnection == null) {
// Retrieve username and password parameter is they exist
String username = (String)connectionSettings.get("username");
String password = (String)connectionSettings.get("password");
if (username == null) {
username = "";
}
if (password == null) {
password = "";
}
jmsConnection = getConnectionFactory().createConnection(username, password);
configureJmsObject(jmsConnection, connectionSettings);
}
return jmsConnection;
}
public Session getSession() throws JMSException {
if (jmsSession == null) {
boolean transacted;
// Check if session is transacted
if (sessionSettings.get("transacted") != null && ((String)sessionSettings.get("transacted")).equals("true")) {
transacted = true;
} else {
transacted = false;
}
// Check acknowledge type - default is AUTO_ACKNOWLEDGE
String ackModeStr = (String)sessionSettings.get("ackMode");
int ackMode = Session.AUTO_ACKNOWLEDGE;
if (ackModeStr != null) {
if (ackModeStr.equalsIgnoreCase("CLIENT_ACKNOWLEDGE")) {
ackMode = Session.CLIENT_ACKNOWLEDGE;
} else if (ackModeStr.equalsIgnoreCase("DUPS_OK_ACKNOWLEDGE")) {
ackMode = Session.DUPS_OK_ACKNOWLEDGE;
} else if (ackModeStr.equalsIgnoreCase("SESSION_TRANSACTED")) {
ackMode = Session.SESSION_TRANSACTED;
}
}
jmsSession = getConnection().createSession(transacted, ackMode);
configureJmsObject(jmsSession, sessionSettings);
}
return jmsSession;
}
public MessageProducer createMessageProducer(Destination dest) throws JMSException {
jmsMessageProducer = getSession().createProducer(dest);
configureJmsObject(jmsMessageProducer, producerSettings);
return jmsMessageProducer;
}
public MessageProducer getMessageProducer() {
return jmsMessageProducer;
}
public MessageConsumer createMessageConsumer(Destination dest) throws JMSException {
jmsMessageConsumer = getSession().createConsumer(dest);
configureJmsObject(jmsMessageConsumer, consumerSettings);
return jmsMessageConsumer;
}
public MessageConsumer createMessageConsumer(Destination dest, String selector) throws JMSException {
jmsMessageConsumer = getSession().createConsumer(dest, selector);
configureJmsObject(jmsMessageConsumer, consumerSettings);
return jmsMessageConsumer;
}
public MessageConsumer createMessageConsumer(Destination dest, String selector, boolean noLocal) throws JMSException {
jmsMessageConsumer = getSession().createConsumer(dest, selector, noLocal);
configureJmsObject(jmsMessageConsumer, consumerSettings);
return jmsMessageConsumer;
}
public MessageConsumer getMessageConsumer() {
return jmsMessageConsumer;
}
public String getClientID() {
try {
return getConnection().getClientID();
} catch (Exception e) {
return "";
}
}
public TopicSubscriber createDurableSubscriber(Topic dest, String name) throws JMSException {
jmsMessageConsumer = getSession().createDurableSubscriber(dest, name);
configureJmsObject(jmsMessageConsumer, consumerSettings);
return (TopicSubscriber)jmsMessageConsumer;
}
public TopicSubscriber createDurableSubscriber(Topic dest, String name, String selector, boolean noLocal) throws JMSException {
jmsMessageConsumer = getSession().createDurableSubscriber(dest, name, selector, noLocal);
configureJmsObject(jmsMessageConsumer, consumerSettings);
return (TopicSubscriber)jmsMessageConsumer;
}
public TopicSubscriber getDurableSubscriber() {
return (TopicSubscriber)jmsMessageConsumer;
}
public TextMessage createTextMessage(String text) throws JMSException {
TextMessage msg = getSession().createTextMessage(text);
configureJmsObject(msg, messageSettings);
return msg;
}
public Queue createQueue(String name) throws JMSException {
Queue queue = getSession().createQueue(name);
configureJmsObject(queue, queueSettings);
return queue;
}
public Topic createTopic(String name) throws JMSException {
Topic topic = getSession().createTopic(name);
configureJmsObject(topic, topicSettings);
return topic;
}
public void addConfigParam(Map props) {
for (Iterator i=props.keySet().iterator(); i.hasNext();) {
String key = (String)i.next();
Object val = props.get(key);
addConfigParam(key, val);
}
}
public void addConfigParam(String key, Object value) {
// Simple mapping of JMS Server to connection factory class
if (key.equalsIgnoreCase("client.server")) {
serverType = value.toString();
if (serverType.equalsIgnoreCase(AMQ_SERVER)) {
factoryClass = AMQ_CONNECTION_FACTORY_CLASS;
}
// Manually specify the connection factory class to use
} else if (key.equalsIgnoreCase("client.factoryClass")) {
factoryClass = value.toString();
// Connection factory specific settings
} else if (key.startsWith("factory.")) {
factorySettings.put(key.substring("factory.".length()), value);
// Connection specific settings
} else if (key.startsWith("connection.")) {
connectionSettings.put(key.substring("session.".length()), value);
// Session specific settings
} else if (key.startsWith("session.")) {
sessionSettings.put(key.substring("session.".length()), value);
// Destination specific settings
} else if (key.startsWith("dest.")) {
queueSettings.put(key.substring("dest.".length()), value);
topicSettings.put(key.substring("dest.".length()), value);
// Queue specific settings
} else if (key.startsWith("queue.")) {
queueSettings.put(key.substring("queue.".length()), value);
// Topic specific settings
} else if (key.startsWith("topic.")) {
topicSettings.put(key.substring("topic.".length()), value);
// Consumer specific settings
} else if (key.startsWith("consumer.")) {
consumerSettings.put(key.substring("consumer.".length()), value);
// Producer specific settings
} else if (key.startsWith("producer.")) {
producerSettings.put(key.substring("producer.".length()), value);
// Message specific settings
} else if (key.startsWith("message.")) {
messageSettings.put(key.substring("message.".length()), value);
// Unknown settings
} else {
log.warn("Unknown setting: " + key + " = " + value);
}
}
public String getServerType() {
return serverType;
}
public String getFactoryClass() {
return factoryClass;
}
public Map getFactorySettings() {
// Create a new HashMap to make the previous one read-only
return new HashMap(factorySettings);
}
public Map getConnectionSettings() {
// Create a new HashMap to make the previous one read-only
return new HashMap(connectionSettings);
}
public Map getSessionSettings() {
// Create a new HashMap to make the previous one read-only
return new HashMap(sessionSettings);
}
public Map getDestinationSettings() {
// Create a new HashMap to make the previous one read-only
Map temp = new HashMap(queueSettings);
temp.putAll(topicSettings);
return connectionSettings;
}
public Map getQueueSettings() {
// Create a new HashMap to make the previous one read-only
return new HashMap(queueSettings);
}
public Map getTopicSettings() {
// Create a new HashMap to make the previous one read-only
return new HashMap(topicSettings);
}
public Map getProducerSettings() {
// Create a new HashMap to make the previous one read-only
return new HashMap(producerSettings);
}
public Map getConsumerSettings() {
// Create a new HashMap to make the previous one read-only
return new HashMap(consumerSettings);
}
public Map getMessageSettings() {
// Create a new HashMap to make the previous one read-only
return new HashMap(messageSettings);
}
public void configureJmsObject(Object jmsObject, Map props) {
if (props == null || props.isEmpty()) {
return;
}
ReflectionUtil.configureClass(jmsObject, props);
}
public void configureJmsObject(Object jmsObject, String key, Object val) {
if (key == null || key == "" || val == null) {
return;
}
ReflectionUtil.configureClass(jmsObject, key, val);
}
}

View File

@ -17,153 +17,298 @@
package org.apache.activemq.tool; package org.apache.activemq.tool;
import javax.jms.*; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
public class JmsConsumerClient extends JmsPerfClientSupport implements MessageListener { import javax.jms.MessageListener;
import javax.jms.MessageConsumer;
import javax.jms.JMSException;
import javax.jms.Destination;
import javax.jms.Topic;
import javax.jms.Message;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
private ConnectionFactory factory = null; public class JmsConsumerClient extends JmsPerformanceSupport {
private static final Log log = LogFactory.getLog(JmsConsumerClient.class);
private String factoryClass = null; private static final String PREFIX_CONFIG_CONSUMER = "consumer.";
private String brokerUrl = null; public static final String TIME_BASED_RECEIVING = "time";
private String destinationName = null; public static final String COUNT_BASED_RECEIVING = "count";
private Destination destination = null;
private boolean isDurable = false; protected Properties jmsConsumerSettings = new Properties();
private boolean isAsync = true; protected MessageConsumer jmsConsumer;
public JmsConsumerClient(ConnectionFactory factory) { protected boolean durable = false;
this.factory = factory; protected boolean asyncRecv = true;
} protected String consumerName = "TestConsumerClient";
public JmsConsumerClient(ConnectionFactory factory, String destinationName) { protected long recvCount = 1000000; // Receive a million messages by default
this.factory = factory; protected long recvDuration = 5 * 60 * 1000; // Receive for 5 mins by default
this.setDestinationName(destinationName); protected String recvType = TIME_BASED_RECEIVING;
}
public JmsConsumerClient(String factoryClass, String brokerUrl, String destinationName) { public void receiveMessages() throws JMSException {
this.factoryClass = factoryClass; if (listener != null) {
this.brokerUrl = brokerUrl;
this.destinationName = destinationName;
}
public JmsConsumerClient(String brokerUrl, String destinationName) {
this.brokerUrl = brokerUrl;
this.destinationName = destinationName;
}
public void createConsumer(long duration) throws JMSException {
listener.onConfigEnd(this); listener.onConfigEnd(this);
}
// Create connection factory if (isAsyncRecv()) {
if (factory != null) { receiveAsyncMessages();
createConnectionFactory(factory);
} else if (factoryClass != null) {
createConnectionFactory(factoryClass, brokerUrl);
} else { } else {
createConnectionFactory(brokerUrl); receiveSyncMessages();
}
} }
if (getDestination() == null) { public void receiveSyncMessages() throws JMSException {
setDestination(getDestinationName()); if (getJmsConsumer() == null) {
createJmsConsumer();
} }
if (isDurable) {
createDurableSubscriber((Topic) getDestination(), getClass().getName());
} else {
createMessageConsumer(getDestination());
}
if (isAsync) {
getMessageConsumer().setMessageListener(this);
getConnection().start();
try { try {
Thread.sleep(duration); getConnection().start();
} catch (InterruptedException e) { if (listener != null) {
throw new JMSException("Error while consumer is sleeping " + e.getMessage()); listener.onConsumeStart(this);
}
if (getRecvType().equalsIgnoreCase(TIME_BASED_RECEIVING)) {
long endTime = System.currentTimeMillis() + getRecvDuration();
while (System.currentTimeMillis() < endTime) {
getJmsConsumer().receive();
incThroughput();
} }
} else { } else {
getConnection().start(); int count = 0;
consumeMessages(getMessageConsumer(), duration); while (count < getRecvCount()) {
} getJmsConsumer().receive();
incThroughput();
close(); //close consumer, session, and connection. count++;
listener.onConfigEnd(this);
}
//Increments throughput
public void onMessage(Message message) {
System.out.println(message.toString());
this.incThroughput();
}
protected void consumeMessages(MessageConsumer consumer, long duration) throws JMSException {
long currentTime = System.currentTimeMillis();
long endTime = currentTime + duration;
while (System.currentTimeMillis() <= endTime) {
Message message = consumer.receive();
onMessage(message);
} }
} }
} finally {
protected void close() throws JMSException { if (listener != null) {
getMessageConsumer().close(); listener.onConsumeEnd(this);
getSession().close(); }
getConnection().close(); getConnection().close();
} }
public static void main(String[] args) throws Exception {
JmsConsumerClient cons = new JmsConsumerClient("org.apache.activemq.ActiveMQConnectionFactory", "tcp://localhost:61616", "topic://TEST.FOO");
cons.setPerfEventListener(new PerfEventAdapter());
cons.createConsumer(20000);
} }
// Helper Methods public void receiveAsyncMessages() throws JMSException {
if (getJmsConsumer() == null) {
createJmsConsumer();
}
if (getRecvType().equalsIgnoreCase(TIME_BASED_RECEIVING)) {
getJmsConsumer().setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
incThroughput();
}
});
try {
getConnection().start();
if (listener != null) {
listener.onConsumeStart(this);
}
try {
Thread.sleep(getRecvDuration());
} catch (InterruptedException e) {
throw new JMSException("JMS consumer thread sleep has been interrupted. Message: " + e.getMessage());
}
} finally {
if (listener != null) {
listener.onConsumeEnd(this);
}
getConnection().close();
}
} else {
final AtomicInteger count = new AtomicInteger(0);
getJmsConsumer().setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
incThroughput();
count.incrementAndGet();
count.notify();
}
});
try {
getConnection().start();
if (listener != null) {
listener.onConsumeStart(this);
}
try {
while (count.get() < getRecvCount()) {
count.wait();
}
} catch (InterruptedException e) {
throw new JMSException("JMS consumer thread wait has been interrupted. Message: " + e.getMessage());
}
} finally {
if (listener != null) {
listener.onConsumeEnd(this);
}
getConnection().close();
}
}
}
public MessageConsumer createJmsConsumer() throws JMSException {
Destination[] dest = createDestination();
return createJmsConsumer(dest[0]);
}
public MessageConsumer createJmsConsumer(Destination dest) throws JMSException {
if (isDurable()) {
jmsConsumer = getSession().createDurableSubscriber((Topic)dest, getConsumerName());
} else {
jmsConsumer = getSession().createConsumer(dest);
}
return jmsConsumer;
}
public MessageConsumer createJmsConsumer(Destination dest, String selector, boolean noLocal) throws JMSException {
if (isDurable()) {
jmsConsumer = getSession().createDurableSubscriber((Topic)dest, getConsumerName(), selector, noLocal);
} else {
jmsConsumer = getSession().createConsumer(dest, selector, noLocal);
}
return jmsConsumer;
}
public MessageConsumer getJmsConsumer() {
return jmsConsumer;
}
public Properties getJmsConsumerSettings() {
return jmsConsumerSettings;
}
public void setJmsConsumerSettings(Properties jmsConsumerSettings) {
this.jmsConsumerSettings = jmsConsumerSettings;
ReflectionUtil.configureClass(this, jmsConsumerSettings);
}
public boolean isDurable() { public boolean isDurable() {
return isDurable; return durable;
} }
public void setDurable(boolean durable) { public void setDurable(boolean durable) {
isDurable = durable; this.durable = durable;
} }
public boolean isAsync() { public boolean isAsyncRecv() {
return isAsync; return asyncRecv;
} }
public void setAsync(boolean async) { public void setAsyncRecv(boolean asyncRecv) {
isAsync = async; this.asyncRecv = asyncRecv;
} }
public String getDestinationName() { public String getConsumerName() {
return this.destinationName; return consumerName;
} }
public void setDestinationName(String destinationName) { public void setConsumerName(String consumerName) {
this.destinationName = destinationName; this.consumerName = consumerName;
} }
public Destination getDestination() { public long getRecvCount() {
return this.destination; return recvCount;
} }
public void setDestination(Destination dest) { public void setRecvCount(long recvCount) {
this.destination = dest; this.recvCount = recvCount;
} }
public void setDestination(String destinationName) throws JMSException { public long getRecvDuration() {
return recvDuration;
}
this.setDestinationName(destinationName); public void setRecvDuration(long recvDuration) {
// Create destinations this.recvDuration = recvDuration;
if (this.getDestinationName().startsWith("topic://")) { }
setDestination(createTopic(getDestinationName().substring("topic://".length())));
} else if (getDestinationName().startsWith("queue://")) { public String getRecvType() {
setDestination(createQueue(getDestinationName().substring("queue://".length()))); return recvType;
}
public void setRecvType(String recvType) {
this.recvType = recvType;
}
public Properties getSettings() {
Properties allSettings = new Properties(jmsConsumerSettings);
allSettings.putAll(super.getSettings());
return allSettings;
}
public void setSettings(Properties settings) {
super.setSettings(settings);
ReflectionUtil.configureClass(this, jmsConsumerSettings);
}
public void setProperty(String key, String value) {
if (key.startsWith(PREFIX_CONFIG_CONSUMER)) {
jmsConsumerSettings.setProperty(key, value);
} else { } else {
setDestination(createQueue(getDestinationName())); super.setProperty(key, value);
} }
} }
public static void main(String[] args) throws JMSException {
String[] options = new String[22];
options[0] = "-Dsampler.duration=60000"; // 1 min
options[1] = "-Dsampler.interval=5000"; // 5 secs
options[2] = "-Dsampler.rampUpTime=10000"; // 10 secs
options[3] = "-Dsampler.rampDownTime=10000"; // 10 secs
options[4] = "-Dclient.spiClass=org.apache.activemq.tool.spi.ActiveMQPojoSPI";
options[5] = "-Dclient.sessTransacted=false";
options[6] = "-Dclient.sessAckMode=autoAck";
options[7] = "-Dclient.destName=topic://FOO.BAR.TEST";
options[8] = "-Dclient.destCount=1";
options[9] = "-Dclient.destComposite=false";
options[10] = "-Dconsumer.durable=false";
options[11] = "-Dconsumer.asyncRecv=true";
options[12] = "-Dconsumer.recvCount=1000"; // 1000 messages
options[13] = "-Dconsumer.recvDuration=60000"; // 1 min
options[14] = "-Dconsumer.recvType=time";
options[15] = "-Dfactory.brokerUrl=tcp://localhost:61616";
options[16] = "-Dfactory.clientID=consumerSampleClient";
options[17] = "-Dfactory.optimAck=true";
options[18] = "-Dfactory.optimDispatch=true";
options[19] = "-Dfactory.prefetchQueue=100";
options[20] = "-Dfactory.prefetchTopic=32767";
options[21] = "-Dfactory.useRetroactive=false";
args = options;
Properties samplerSettings = new Properties();
Properties consumerSettings = new Properties();
for (int i=0; i<args.length; i++) {
// Get property define options only
if (args[i].startsWith("-D")) {
String propDefine = args[i].substring("-D".length());
int index = propDefine.indexOf("=");
String key = propDefine.substring(0, index);
String val = propDefine.substring(index+1);
if (key.startsWith("sampler.")) {
samplerSettings.setProperty(key, val);
} else {
consumerSettings.setProperty(key, val);
}
}
}
JmsConsumerClient client = new JmsConsumerClient();
client.setSettings(consumerSettings);
PerfMeasurementTool sampler = new PerfMeasurementTool();
sampler.setSamplerSettings(samplerSettings);
sampler.registerClient(client);
sampler.startSampler();
client.setPerfEventListener(sampler);
client.receiveMessages();
}
} }

View File

@ -0,0 +1,95 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.activemq.tool.spi.SPIConnectionFactory;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import java.util.Properties;
import java.util.Iterator;
public class JmsFactorySupport {
private static final Log log = LogFactory.getLog(JmsFactorySupport.class);
private static final String PREFIX_CONFIG_FACTORY = "factory.";
private SPIConnectionFactory spiFactory;
protected ConnectionFactory jmsFactory;
protected Properties jmsFactorySettings = new Properties();
public ConnectionFactory createConnectionFactory(String spiClass) throws JMSException {
jmsFactory = loadJmsFactory(spiClass, jmsFactorySettings);
return jmsFactory;
}
protected ConnectionFactory loadJmsFactory(String spiClass, Properties factorySettings) throws JMSException {
try {
Class spi = Class.forName(spiClass);
spiFactory = (SPIConnectionFactory)spi.newInstance();
ConnectionFactory jmsFactory = spiFactory.createConnectionFactory(factorySettings);
log.debug("Created: " + jmsFactory.getClass().getName() + " using SPIConnectionFactory: " + spiFactory.getClass().getName());
return jmsFactory;
} catch (Exception e) {
throw new JMSException(e.getMessage());
}
}
public ConnectionFactory getJmsFactory() {
return jmsFactory;
}
public Properties getJmsFactorySettings() {
return jmsFactorySettings;
}
public void setJmsFactorySettings(Properties jmsFactorySettings) {
this.jmsFactorySettings = jmsFactorySettings;
try {
spiFactory.configureConnectionFactory(jmsFactory, jmsFactorySettings);
} catch (Exception e) {
log.warn(e);
}
}
public Properties getSettings() {
return jmsFactorySettings;
}
public void setSettings(Properties settings) {
for (Iterator i=settings.keySet().iterator(); i.hasNext();) {
String key = (String)i.next();
String val = settings.getProperty(key);
setProperty(key, val);
}
try {
spiFactory.configureConnectionFactory(jmsFactory, jmsFactorySettings);
} catch (Exception e) {
log.warn(e);
}
}
public void setProperty(String key, String value) {
if (key.startsWith(PREFIX_CONFIG_FACTORY)) {
jmsFactorySettings.setProperty(key, value);
} else {
log.warn("Unknown setting: " + key + "=" + value);
}
}
}

View File

@ -18,10 +18,9 @@ package org.apache.activemq.tool;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
import java.util.Map; import javax.jms.JMSException;
import java.util.HashMap;
public class JmsPerfClientSupport extends JmsConfigurableClientSupport implements PerfMeasurable { public class JmsPerformanceSupport extends JmsClientSupport implements PerfMeasurable {
protected AtomicLong throughput = new AtomicLong(0); protected AtomicLong throughput = new AtomicLong(0);
@ -31,6 +30,14 @@ public class JmsPerfClientSupport extends JmsConfigurableClientSupport implement
setThroughput(0); setThroughput(0);
} }
public String getClientName() {
try {
return getConnection().getClientID();
} catch (JMSException e) {
return "";
}
}
public long getThroughput() { public long getThroughput() {
return throughput.get(); return throughput.get();
} }
@ -54,25 +61,4 @@ public class JmsPerfClientSupport extends JmsConfigurableClientSupport implement
public PerfEventListener getPerfEventListener() { public PerfEventListener getPerfEventListener() {
return listener; return listener;
} }
public Map getClientSettings() {
Map settings = new HashMap();
settings.put("client.server", getServerType());
settings.put("client.factoryClass", getFactoryClass());
settings.put("client.clientID", getClientID());
settings.putAll(getFactorySettings());
settings.putAll(getConnectionSettings());
settings.putAll(getSessionSettings());
settings.putAll(getQueueSettings());
settings.putAll(getTopicSettings());
settings.putAll(getProducerSettings());
settings.putAll(getConsumerSettings());
settings.putAll(getMessageSettings());
return settings;
}
public String getClientName() {
return getClientID();
}
} }

View File

@ -16,138 +16,80 @@
*/ */
package org.apache.activemq.tool; package org.apache.activemq.tool;
import javax.jms.ConnectionFactory; import org.apache.commons.logging.Log;
import javax.jms.TextMessage; import org.apache.commons.logging.LogFactory;
import javax.jms.MessageProducer;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.TextMessage;
import javax.jms.JMSException; import javax.jms.JMSException;
import java.util.Map; import java.util.Properties;
import java.util.Arrays; import java.util.Arrays;
public class JmsProducerClient extends JmsPerfClientSupport { public class JmsProducerClient extends JmsPerformanceSupport {
private static final Log log = LogFactory.getLog(JmsProducerClient.class);
private ConnectionFactory factory = null; private static final String PREFIX_CONFIG_PRODUCER = "producer.";
private String factoryClass = ""; public static final String TIME_BASED_SENDING = "time";
private String brokerUrl = ""; public static final String COUNT_BASED_SENDING = "count";
private String[] destName = null;
private Destination[] dest = null; protected Properties jmsProducerSettings = new Properties();
private TextMessage message = null; protected MessageProducer jmsProducer;
protected TextMessage jmsTextMessage;
public JmsProducerClient(ConnectionFactory factory, String destName) { protected int messageSize = 1024; // Send 1kb messages by default
this.factory = factory; protected long sendCount = 1000000; // Send a million messages by default
this.destName = new String[] {destName}; protected long sendDuration = 5 * 60 * 1000; // Send for 5 mins by default
} protected String sendType = TIME_BASED_SENDING;
public JmsProducerClient(String factoryClass, String brokerUrl, String destName) {
this.factoryClass = factoryClass;
this.brokerUrl = brokerUrl;
this.destName = new String[] {destName};
}
public JmsProducerClient(String brokerUrl, String destName) {
this.brokerUrl = brokerUrl;
this.destName = new String[] {destName};
}
public JmsProducerClient(ConnectionFactory factory, String[] destName) {
this.factory = factory;
this.destName = destName;
}
public JmsProducerClient(String factoryClass, String brokerUrl, String[] destName) {
this.factoryClass = factoryClass;
this.brokerUrl = brokerUrl;
this.destName = destName;
}
public JmsProducerClient(String brokerUrl, String[] destName) {
this.brokerUrl = brokerUrl;
this.destName = destName;
}
public void createProducer() throws JMSException {
createProducer(0);
}
public void createProducer(Map settings) throws JMSException {
createProducer(0, settings);
}
public void createProducer(int messageSize, Map settings) throws JMSException {
addConfigParam(settings);
createProducer(messageSize);
}
public void createProducer(int messageSize) throws JMSException {
listener.onConfigStart(this);
// Create connection factory
if (factory != null) {
createConnectionFactory(factory);
} else if (factoryClass != null) {
createConnectionFactory(factoryClass, brokerUrl);
} else {
createConnectionFactory(brokerUrl);
}
createConnectionFactory(brokerUrl);
// Create destinations
dest = new Destination[destName.length];
for (int i=0; i<destName.length; i++) {
if (destName[i].startsWith("topic://")) {
dest[i] = createTopic(destName[i].substring("topic://".length()));
} else if (destName[i].startsWith("queue://")) {
dest[i] = createQueue(destName[i].substring("queue://".length()));
} else {
dest[i] = createQueue(destName[i]);
}
}
// Create actual message producer
if (dest.length > 1) {
createMessageProducer(null);
} else {
createMessageProducer(dest[0]);
}
// Create message to sent
if (messageSize > 0) {
byte[] val = new byte[messageSize];
Arrays.fill(val, (byte)0);
String buff = new String(val);
message = createTextMessage(buff);
}
public void sendMessages() throws JMSException {
if (listener != null) {
listener.onConfigEnd(this); listener.onConfigEnd(this);
} }
// Send a specific number of messages
if (sendType.equalsIgnoreCase(COUNT_BASED_SENDING)) {
sendCountBasedMessages(getSendCount());
// Send messages for a specific duration
} else {
sendTimeBasedMessages(getSendDuration());
}
}
public void sendCountBasedMessages(long messageCount) throws JMSException { public void sendCountBasedMessages(long messageCount) throws JMSException {
// Parse through different ways to send messages // Parse through different ways to send messages
// Avoided putting the condition inside the loop to prevent effect on performance // Avoided putting the condition inside the loop to prevent effect on performance
Destination[] dest = createDestination();
// Create a producer, if none is created.
if (getJmsProducer() == null) {
if (dest.length == 1) {
createJmsProducer(dest[0]);
} else {
createJmsProducer();
}
}
try { try {
getConnection().start(); getConnection().start();
if (listener != null) {
listener.onPublishStart(this);
}
// Send one type of message only, avoiding the creation of different messages on sending // Send one type of message only, avoiding the creation of different messages on sending
if (message != null) { if (getJmsTextMessage() != null) {
// Send to more than one actual destination // Send to more than one actual destination
if (dest.length > 1) { if (dest.length > 1) {
listener.onPublishStart(this);
for (int i=0; i<messageCount; i++) { for (int i=0; i<messageCount; i++) {
for (int j=0; j<dest.length; j++) { for (int j=0; j<dest.length; j++) {
getMessageProducer().send(dest[j], message); getJmsProducer().send(dest[j], getJmsTextMessage());
incThroughput(); incThroughput();
} }
} }
listener.onPublishEnd(this);
// Send to only one actual destination // Send to only one actual destination
} else { } else {
listener.onPublishStart(this);
for (int i=0; i<messageCount; i++) { for (int i=0; i<messageCount; i++) {
getMessageProducer().send(message); getJmsProducer().send(getJmsTextMessage());
incThroughput(); incThroughput();
} }
listener.onPublishEnd(this);
} }
// Send different type of messages using indexing to identify each one. // Send different type of messages using indexing to identify each one.
@ -156,26 +98,25 @@ public class JmsProducerClient extends JmsPerfClientSupport {
} else { } else {
// Send to more than one actual destination // Send to more than one actual destination
if (dest.length > 1) { if (dest.length > 1) {
listener.onPublishStart(this);
for (int i=0; i<messageCount; i++) { for (int i=0; i<messageCount; i++) {
for (int j=0; j<dest.length; j++) { for (int j=0; j<dest.length; j++) {
getMessageProducer().send(dest[j], createTextMessage("Text Message [" + i + "]")); getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + i + "]"));
incThroughput(); incThroughput();
} }
} }
listener.onPublishEnd(this);
// Send to only one actual destination // Send to only one actual destination
} else { } else {
listener.onPublishStart(this);
for (int i=0; i<messageCount; i++) { for (int i=0; i<messageCount; i++) {
getMessageProducer().send(createTextMessage("Text Message [" + i + "]")); getJmsProducer().send(createJmsTextMessage("Text Message [" + i + "]"));
incThroughput(); incThroughput();
} }
listener.onPublishEnd(this);
} }
} }
} finally { } finally {
if (listener != null) {
listener.onPublishEnd(this);
}
getConnection().close(); getConnection().close();
} }
} }
@ -185,29 +126,39 @@ public class JmsProducerClient extends JmsPerfClientSupport {
// Parse through different ways to send messages // Parse through different ways to send messages
// Avoided putting the condition inside the loop to prevent effect on performance // Avoided putting the condition inside the loop to prevent effect on performance
// Send one type of message only, avoiding the creation of different messages on sending Destination[] dest = createDestination();
// Create a producer, if none is created.
if (getJmsProducer() == null) {
if (dest.length == 1) {
createJmsProducer(dest[0]);
} else {
createJmsProducer();
}
}
try { try {
getConnection().start(); getConnection().start();
if (listener != null) {
listener.onPublishStart(this);
}
if (message != null) { // Send one type of message only, avoiding the creation of different messages on sending
if (getJmsTextMessage() != null) {
// Send to more than one actual destination // Send to more than one actual destination
if (dest.length > 1) { if (dest.length > 1) {
listener.onPublishStart(this);
while (System.currentTimeMillis() < endTime) { while (System.currentTimeMillis() < endTime) {
for (int j=0; j<dest.length; j++) { for (int j=0; j<dest.length; j++) {
getMessageProducer().send(dest[j], message); getJmsProducer().send(dest[j], getJmsTextMessage());
incThroughput(); incThroughput();
} }
} }
listener.onPublishEnd(this);
// Send to only one actual destination // Send to only one actual destination
} else { } else {
listener.onPublishStart(this);
while (System.currentTimeMillis() < endTime) { while (System.currentTimeMillis() < endTime) {
getMessageProducer().send(message); getJmsProducer().send(getJmsTextMessage());
incThroughput(); incThroughput();
} }
listener.onPublishEnd(this);
} }
// Send different type of messages using indexing to identify each one. // Send different type of messages using indexing to identify each one.
@ -217,66 +168,183 @@ public class JmsProducerClient extends JmsPerfClientSupport {
// Send to more than one actual destination // Send to more than one actual destination
long count = 1; long count = 1;
if (dest.length > 1) { if (dest.length > 1) {
listener.onPublishStart(this);
while (System.currentTimeMillis() < endTime) { while (System.currentTimeMillis() < endTime) {
for (int j=0; j<dest.length; j++) { for (int j=0; j<dest.length; j++) {
getMessageProducer().send(dest[j], createTextMessage("Text Message [" + count++ + "]")); getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + count++ + "]"));
incThroughput(); incThroughput();
} }
} }
listener.onPublishEnd(this);
// Send to only one actual destination // Send to only one actual destination
} else { } else {
listener.onPublishStart(this);
while (System.currentTimeMillis() < endTime) { while (System.currentTimeMillis() < endTime) {
getMessageProducer().send(createTextMessage("Text Message [" + count++ + "]"));
getJmsProducer().send(createJmsTextMessage("Text Message [" + count++ + "]"));
incThroughput(); incThroughput();
} }
listener.onPublishEnd(this);
} }
} }
} finally { } finally {
if (listener != null) {
listener.onPublishEnd(this);
}
getConnection().close(); getConnection().close();
} }
} }
public static void main(String[] args) throws Exception { public Properties getJmsProducerSettings() {
final long duration = 1 * 60 * 1000; return jmsProducerSettings;
long rampUpTime = 5 * 1000;
long rampDownTime = 5 * 1000;
long interval = 1000;
PerfMeasurementTool tool = new PerfMeasurementTool();
tool.setDuration(duration);
tool.setInterval(interval);
tool.setRampUpTime(rampUpTime);
tool.setRampDownTime(rampDownTime);
JmsProducerClient[] client = new JmsProducerClient[10];
for (int i=0; i<10; i++) {
client[i] = new JmsProducerClient("org.apache.activemq.ActiveMQConnectionFactory", "tcp://localhost:61616", "topic://TEST.FOO");
client[i].addConfigParam("factory.asyncSend", "true");
client[i].setPerfEventListener(new PerfEventAdapter());
client[i].createProducer();
tool.registerClient(client[i]);
} }
tool.startSampler(); public void setJmsProducerSettings(Properties jmsProducerSettings) {
this.jmsProducerSettings = jmsProducerSettings;
ReflectionUtil.configureClass(this, jmsProducerSettings);
}
for (int i=0; i<10; i++) { public MessageProducer createJmsProducer() throws JMSException {
final JmsProducerClient p = client[i]; jmsProducer = getSession().createProducer(null);
Thread t = new Thread(new Runnable() { return jmsProducer;
public void run() {
try {
p.sendTimeBasedMessages(duration);
} catch (JMSException e) {
e.printStackTrace();
} }
}
});
t.start();
public MessageProducer createJmsProducer(Destination dest) throws JMSException {
jmsProducer = getSession().createProducer(dest);
return jmsProducer;
} }
public MessageProducer getJmsProducer() {
return jmsProducer;
}
public TextMessage createJmsTextMessage() throws JMSException {
return createJmsTextMessage(getMessageSize());
}
public TextMessage createJmsTextMessage(int size) throws JMSException {
jmsTextMessage = getSession().createTextMessage(buildText("", size));
return jmsTextMessage;
}
public TextMessage createJmsTextMessage(String text) throws JMSException {
jmsTextMessage = getSession().createTextMessage(buildText(text, getMessageSize()));
return jmsTextMessage;
}
public TextMessage getJmsTextMessage() {
return jmsTextMessage;
}
protected String buildText(String text, int size) {
byte[] data = new byte[size - text.length()];
Arrays.fill(data, (byte)0);
return text + new String(data);
}
public int getMessageSize() {
return messageSize;
}
public void setMessageSize(int messageSize) {
this.messageSize = messageSize;
}
public long getSendCount() {
return sendCount;
}
public void setSendCount(long sendCount) {
this.sendCount = sendCount;
}
public long getSendDuration() {
return sendDuration;
}
public void setSendDuration(long sendDuration) {
this.sendDuration = sendDuration;
}
public String getSendType() {
return sendType;
}
public void setSendType(String sendType) {
this.sendType = sendType;
}
public Properties getSettings() {
Properties allSettings = new Properties(jmsProducerSettings);
allSettings.putAll(super.getSettings());
return allSettings;
}
public void setSettings(Properties settings) {
super.setSettings(settings);
ReflectionUtil.configureClass(this, jmsProducerSettings);
}
public void setProperty(String key, String value) {
if (key.startsWith(PREFIX_CONFIG_PRODUCER)) {
jmsProducerSettings.setProperty(key, value);
} else {
super.setProperty(key, value);
}
}
public static void main(String[] args) throws JMSException {
String[] options = new String[17];
options[0] = "-Dsampler.duration=60000"; // 1 min
options[1] = "-Dsampler.interval=5000"; // 5 secs
options[2] = "-Dsampler.rampUpTime=10000"; // 10 secs
options[3] = "-Dsampler.rampDownTime=10000"; // 10 secs
options[4] = "-Dclient.spiClass=org.apache.activemq.tool.spi.ActiveMQPojoSPI";
options[5] = "-Dclient.sessTransacted=false";
options[6] = "-Dclient.sessAckMode=autoAck";
options[7] = "-Dclient.destName=topic://FOO.BAR.TEST";
options[8] = "-Dclient.destCount=1";
options[9] = "-Dclient.destComposite=false";
options[10] = "-Dproducer.messageSize=1024";
options[11] = "-Dproducer.sendCount=1000"; // 1000 messages
options[12] = "-Dproducer.sendDuration=60000"; // 1 min
options[13] = "-Dproducer.sendType=time";
options[14] = "-Dfactory.brokerUrl=tcp://localhost:61616";
options[15] = "-Dfactory.clientID=producerSampleClient";
options[16] = "-Dfactory.asyncSend=true";
args = options;
Properties samplerSettings = new Properties();
Properties producerSettings = new Properties();
for (int i=0; i<args.length; i++) {
// Get property define options only
if (args[i].startsWith("-D")) {
String propDefine = args[i].substring("-D".length());
int index = propDefine.indexOf("=");
String key = propDefine.substring(0, index);
String val = propDefine.substring(index+1);
if (key.startsWith("sampler.")) {
samplerSettings.setProperty(key, val);
} else {
producerSettings.setProperty(key, val);
}
}
}
JmsProducerClient client = new JmsProducerClient();
client.setSettings(producerSettings);
PerfMeasurementTool sampler = new PerfMeasurementTool();
sampler.setSamplerSettings(samplerSettings);
sampler.registerClient(client);
sampler.startSampler();
client.setPerfEventListener(sampler);
// This will reuse only a single message every send, which will improve performance
client.createJmsTextMessage();
client.sendMessages();
} }
} }

View File

@ -16,13 +16,13 @@
*/ */
package org.apache.activemq.tool; package org.apache.activemq.tool;
import java.util.Map; import java.util.Properties;
public interface PerfMeasurable { public interface PerfMeasurable {
public void reset(); public void reset();
public String getClientName(); public String getClientName();
public long getThroughput(); public long getThroughput();
public Map getClientSettings(); public Properties getSettings();
public void setPerfEventListener(PerfEventListener listener); public void setPerfEventListener(PerfEventListener listener);
public PerfEventListener getPerfEventListener(); public PerfEventListener getPerfEventListener();
} }

View File

@ -16,13 +16,13 @@
*/ */
package org.apache.activemq.tool; package org.apache.activemq.tool;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException; import javax.jms.JMSException;
import java.util.List; import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.Properties;
public class PerfMeasurementTool implements PerfEventListener, Runnable { public class PerfMeasurementTool implements PerfEventListener, Runnable {
private long duration = 5 * 60 * 1000; // 5 mins by default test duration private long duration = 5 * 60 * 1000; // 5 mins by default test duration
@ -33,14 +33,13 @@ public class PerfMeasurementTool implements PerfEventListener, Runnable {
private AtomicBoolean start = new AtomicBoolean(false); private AtomicBoolean start = new AtomicBoolean(false);
private AtomicBoolean stop = new AtomicBoolean(false); private AtomicBoolean stop = new AtomicBoolean(false);
private Properties samplerSettings = new Properties();
private List perfClients = new ArrayList(); private List perfClients = new ArrayList();
private AtomicInteger unstartedClients = new AtomicInteger(0);
public void registerClient(PerfMeasurable client) { public void registerClient(PerfMeasurable client) {
client.setPerfEventListener(this); client.setPerfEventListener(this);
perfClients.add(client); perfClients.add(client);
unstartedClients.incrementAndGet();
} }
public void registerClient(PerfMeasurable[] clients) { public void registerClient(PerfMeasurable[] clients) {
@ -49,10 +48,13 @@ public class PerfMeasurementTool implements PerfEventListener, Runnable {
} }
} }
public void startSampler() { public Properties getSamplerSettings() {
Thread t = new Thread(this); return samplerSettings;
t.setName("Performance Sampler"); }
t.start();
public void setSamplerSettings(Properties samplerSettings) {
this.samplerSettings = samplerSettings;
ReflectionUtil.configureClass(this, samplerSettings);
} }
public long getDuration() { public long getDuration() {
@ -112,6 +114,12 @@ public class PerfMeasurementTool implements PerfEventListener, Runnable {
stop.set(true); stop.set(true);
} }
public void startSampler() {
Thread t = new Thread(this);
t.setName("Performance Sampler");
t.start();
}
public void run() { public void run() {
// Compute for the actual duration window of the sampler // Compute for the actual duration window of the sampler
long endTime = System.currentTimeMillis() + duration - rampDownTime; long endTime = System.currentTimeMillis() + duration - rampDownTime;

View File

@ -1,133 +0,0 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import java.util.Date;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* A simple tool for publishing messages
*
* @version $Revision$
*/
public class ProducerTool extends ToolSupport {
protected int messageCount = 10;
protected long sleepTime = 0L;
protected boolean verbose = true;
protected int messageSize = 255;
public static void main(String[] args) {
runTool(args, new ProducerTool());
}
protected static void runTool(String[] args, ProducerTool tool) {
if (args.length > 0) {
tool.url = args[0];
}
if (args.length > 1) {
tool.topic = args[1].equalsIgnoreCase("true");
}
if (args.length > 2) {
tool.subject = args[2];
}
if (args.length > 3) {
tool.durable = args[3].equalsIgnoreCase("true");
}
if (args.length > 4) {
tool.messageCount = Integer.parseInt(args[4]);
}
if (args.length > 5) {
tool.messageSize = Integer.parseInt(args[5]);
}
tool.run();
}
public void run() {
try {
System.out.println("Connecting to URL: " + url);
System.out.println("Publishing a Message with size "+messageSize+" to " + (topic ? "topic" : "queue") + ": " + subject);
System.out.println("Using " + (durable ? "durable" : "non-durable") + " publishing");
Connection connection = createConnection();
Session session = createSession(connection);
MessageProducer producer = createProducer(session);
//connection.start();
sendLoop(session, producer);
System.out.println("Done.");
close(connection, session);
}
catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
protected MessageProducer createProducer(Session session) throws JMSException {
MessageProducer producer = session.createProducer(destination);
if (durable) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
}
else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
return producer;
}
protected void sendLoop(Session session, MessageProducer producer) throws Exception {
for (int i = 0; i < messageCount; i++) {
TextMessage message = session.createTextMessage(createMessageText(i));
if (verbose) {
String msg = message.getText();
if( msg.length() > 50 )
msg = msg.substring(0, 50)+"...";
System.out.println("Sending message: " + msg);
}
producer.send(message);
Thread.sleep(sleepTime);
}
producer.send(session.createMessage());
}
/**
* @param i
* @return
*/
private String createMessageText(int index) {
StringBuffer buffer = new StringBuffer(messageSize);
buffer.append("Message: " + index + " sent at: " + new Date());
if( buffer.length() > messageSize ) {
return buffer.substring(0, messageSize);
}
for( int i=buffer.length(); i < messageSize; i++)
buffer.append(' ');
return buffer.toString();
}
}

View File

@ -19,10 +19,11 @@ package org.apache.activemq.tool;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import java.util.Map;
import java.util.Iterator; import java.util.Iterator;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import java.util.Properties;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.Field;
public final class ReflectionUtil { public final class ReflectionUtil {
private static final Log log = LogFactory.getLog(ReflectionUtil.class); private static final Log log = LogFactory.getLog(ReflectionUtil.class);
@ -31,116 +32,98 @@ public final class ReflectionUtil {
} }
public static void configureClass(Object obj, String key, Object val) { public static void configureClass(Object obj, String key, String val) {
try { try {
String debugInfo;
Object target = obj; Object target = obj;
Class targetClass = obj.getClass(); Class targetClass = obj.getClass();
//System.out.print("Invoking: " + targetClass); // DEBUG: Debugging Info
debugInfo = "Invoking: " + targetClass.getName();
StringTokenizer tokenizer = new StringTokenizer(key, "."); StringTokenizer tokenizer = new StringTokenizer(key, ".");
// NOTE: Skip the first token, it is assume that this is an indicator for the object itself
tokenizer.nextToken();
// For nested settings, get the object first
for (int j=0; j<tokenizer.countTokens()-1; j++) { for (int j=0; j<tokenizer.countTokens()-1; j++) {
// Find getter method first // Find getter method first
String name = tokenizer.nextToken(); String name = tokenizer.nextToken();
String getMethod = "get" + name.substring(0,1).toUpperCase() + name.substring(1); String getMethod = "get" + name.substring(0,1).toUpperCase() + name.substring(1);
Method method = targetClass.getMethod(getMethod, new Class[] {}); Method method = targetClass.getMethod(getMethod, new Class[] {});
target = method.invoke(target, null); target = method.invoke(target, (Object[])null);
targetClass = target.getClass(); targetClass = target.getClass();
//System.out.print("." + getMethod + "()"); debugInfo += ("." + getMethod + "()");
} }
// Invoke setter method of last class // Property name
String name = tokenizer.nextToken(); String property = tokenizer.nextToken();
String methodName = "set" + name.substring(0,1).toUpperCase() + name.substring(1);
// Determine data type of property
Class propertyType = getField(targetClass, property).getType();
// Get setter method
String setterMethod = "set" + property.substring(0,1).toUpperCase() + property.substring(1);
// Set primitive type
debugInfo += ("." + setterMethod + "(" + propertyType.getName() + ": " + val + ")");
if (propertyType.isPrimitive()) {
if (propertyType == Boolean.TYPE) {
targetClass.getMethod(setterMethod, new Class[] {boolean.class}).invoke(target, new Object[] {Boolean.valueOf(val)});
} else if (propertyType == Integer.TYPE) {
targetClass.getMethod(setterMethod, new Class[] {int.class}).invoke(target, new Object[] {Integer.valueOf(val)});
} else if (propertyType == Long.TYPE) {
targetClass.getMethod(setterMethod, new Class[] {long.class}).invoke(target, new Object[] {Long.valueOf(val)});
} else if (propertyType == Double.TYPE) {
targetClass.getMethod(setterMethod, new Class[] {double.class}).invoke(target, new Object[] {Double.valueOf(val)});
} else if (propertyType == Float.TYPE) {
targetClass.getMethod(setterMethod, new Class[] {float.class}).invoke(target, new Object[] {Float.valueOf(val)});
} else if (propertyType == Short.TYPE) {
targetClass.getMethod(setterMethod, new Class[] {short.class}).invoke(target, new Object[] {Short.valueOf(val)});
} else if (propertyType == Byte.TYPE) {
targetClass.getMethod(setterMethod, new Class[] {byte.class}).invoke(target, new Object[] {Byte.valueOf(val)});
} else if (propertyType == Character.TYPE) {
targetClass.getMethod(setterMethod, new Class[] {char.class}).invoke(target, new Object[] {val.charAt(0)});
}
} else {
// Set String type
if (propertyType == String.class) {
targetClass.getMethod(setterMethod, new Class[] {String.class}).invoke(target, new Object[] {val});
// For unknown object type, try to call the valueOf method of the object
// to convert the string to the target object type
} else {
Object param = propertyType.getMethod("valueOf", new Class[] {String.class}).invoke(null, val);
targetClass.getMethod(setterMethod, new Class[] {propertyType}).invoke(target, new Object[] {param});
}
}
log.debug(debugInfo);
Method method = getPrimitiveMethod(targetClass, methodName, val);
Object[] objVal = {val};
method.invoke(target, objVal);
//method.invoke(target, val);
//System.out.println("." + methodName + "(" + val + ")");
} catch (Exception e) { } catch (Exception e) {
log.warn("", e); log.warn(e);
} }
} }
public static void configureClass(Object obj, Map props) { public static void configureClass(Object obj, Properties props) {
for (Iterator i = props.keySet().iterator(); i.hasNext();) { for (Iterator i = props.keySet().iterator(); i.hasNext();) {
String key = (String)i.next(); String key = (String)i.next();
Object val = props.get(key); String val = props.getProperty(key);
configureClass(obj, key, val); configureClass(obj, key, val);
} }
} }
private static Method getPrimitiveMethod(Class objClass, String methodName, Object param) throws NoSuchMethodException { public static Field getField(Class targetClass, String fieldName) throws NoSuchFieldException {
if (param instanceof Boolean) { while (targetClass != null) {
try { try {
// Try using the primitive type first return targetClass.getDeclaredField(fieldName);
return objClass.getMethod(methodName, new Class[] {boolean.class}); } catch (NoSuchFieldException e) {
} catch (NoSuchMethodException e) { targetClass = targetClass.getSuperclass();
// Try using the wrapper class next
return objClass.getMethod(methodName, new Class[] {Boolean.class});
} }
} else if (param instanceof Integer) {
try {
// Try using the primitive type first
return objClass.getMethod(methodName, new Class[] {int.class});
} catch (NoSuchMethodException e) {
// Try using the wrapper class next
return objClass.getMethod(methodName, new Class[] {Integer.class});
}
} else if (param instanceof Long) {
try {
// Try using the primitive type first
return objClass.getMethod(methodName, new Class[] {long.class});
} catch (NoSuchMethodException e) {
// Try using the wrapper class next
return objClass.getMethod(methodName, new Class[] {Long.class});
}
} else if (param instanceof Short) {
try {
// Try using the primitive type first
return objClass.getMethod(methodName, new Class[] {short.class});
} catch (NoSuchMethodException e) {
// Try using the wrapper class next
return objClass.getMethod(methodName, new Class[] {Short.class});
}
} else if (param instanceof Byte) {
try {
// Try using the primitive type first
return objClass.getMethod(methodName, new Class[] {byte.class});
} catch (NoSuchMethodException e) {
// Try using the wrapper class next
return objClass.getMethod(methodName, new Class[] {Byte.class});
}
} else if (param instanceof Character) {
try {
// Try using the primitive type first
return objClass.getMethod(methodName, new Class[] {char.class});
} catch (NoSuchMethodException e) {
// Try using the wrapper class next
return objClass.getMethod(methodName, new Class[] {Character.class});
}
} else if (param instanceof Double) {
try {
// Try using the primitive type first
return objClass.getMethod(methodName, new Class[] {double.class});
} catch (NoSuchMethodException e) {
// Try using the wrapper class next
return objClass.getMethod(methodName, new Class[] {Double.class});
}
} else if (param instanceof Float) {
try {
// Try using the primitive type first
return objClass.getMethod(methodName, new Class[] {float.class});
} catch (NoSuchMethodException e) {
// Try using the wrapper class next
return objClass.getMethod(methodName, new Class[] {Float.class});
}
} else {
// parameter is not a primitive
return objClass.getMethod(methodName, new Class[] {param.getClass()});
} }
throw new NoSuchFieldException(fieldName);
} }
} }

View File

@ -1,84 +0,0 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IndentPrinter;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
/**
* Abstract base class useful for implementation inheritence
*
* @version $Revision$
*/
public class ToolSupport {
protected Destination destination;
protected String subject = "TOOL.DEFAULT";
protected boolean topic = true;
protected String user = ActiveMQConnection.DEFAULT_USER;
protected String pwd = ActiveMQConnection.DEFAULT_PASSWORD;
protected String url = ActiveMQConnection.DEFAULT_BROKER_URL;
protected boolean transacted = false;
protected boolean durable = false;
protected String clientID = getClass().getName();
protected int ackMode = Session.AUTO_ACKNOWLEDGE;
protected String consumerName = "James";
protected Session createSession(Connection connection) throws Exception {
if (durable) {
connection.setClientID(clientID);
}
Session session = connection.createSession(transacted, ackMode);
if (topic) {
destination = session.createTopic(subject);
}
else {
destination = session.createQueue(subject);
}
return session;
}
protected Connection createConnection() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
return connectionFactory.createConnection();
}
protected void close(Connection connection, Session session) throws JMSException {
// lets dump the stats
dumpStats(connection);
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
protected void dumpStats(Connection connection) {
ActiveMQConnection c = (ActiveMQConnection) connection;
c.getConnectionStats().dump(new IndentPrinter());
}
}

View File

@ -0,0 +1,23 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool.spi;
public class ActiveMQClassLoaderSPI extends ClassLoaderSPIConnectionFactory {
public String getClassName() {
return "org.apache.activemq.ActiveMQConnectionFactory";
}
}

View File

@ -0,0 +1,145 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool.spi;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.ConnectionFactory;
import java.util.Properties;
public class ActiveMQPojoSPI implements SPIConnectionFactory {
public static final String KEY_BROKER_URL = "factory.brokerUrl";
public static final String KEY_USERNAME = "factory.username";
public static final String KEY_PASSWORD = "factory.password";
public static final String KEY_CLIENT_ID = "factory.clientID";
public static final String KEY_ASYNC_SEND = "factory.asyncSend";
public static final String KEY_ASYNC_DISPATCH = "factory.asyncDispatch";
public static final String KEY_ASYNC_SESSION = "factory.asyncSession";
public static final String KEY_CLOSE_TIMEOUT = "factory.closeTimeout";
public static final String KEY_COPY_MSG_ON_SEND = "factory.copyMsgOnSend";
public static final String KEY_DISABLE_TIMESTAMP = "factory.disableTimestamp";
public static final String KEY_DEFER_OBJ_SERIAL = "factory.deferObjSerial";
public static final String KEY_ON_SEND_PREP_MSG = "factory.onSendPrepMsg";
public static final String KEY_OPTIM_ACK = "factory.optimAck";
public static final String KEY_OPTIM_DISPATCH = "factory.optimDispatch";
public static final String KEY_PREFETCH_QUEUE = "factory.prefetchQueue";
public static final String KEY_PREFETCH_TOPIC = "factory.prefetchTopic";
public static final String KEY_USE_COMPRESSION = "factory.useCompression";
public static final String KEY_USE_RETROACTIVE = "factory.useRetroactive";
public ConnectionFactory createConnectionFactory(Properties settings) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
configureConnectionFactory(factory, settings);
return factory;
}
public void configureConnectionFactory(ConnectionFactory jmsFactory, Properties settings) throws Exception {
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)jmsFactory;
String setting;
setting = settings.getProperty(KEY_BROKER_URL);
if (setting != null && setting.length() > 0) {
factory.setBrokerURL(setting);
}
setting = settings.getProperty(KEY_USERNAME);
if (setting != null && setting.length() > 0) {
factory.setUserName(setting);
}
setting = settings.getProperty(KEY_PASSWORD);
if (setting != null && setting.length() > 0) {
factory.setPassword(setting);
}
setting = settings.getProperty(KEY_CLIENT_ID);
if (setting != null && setting.length() > 0) {
factory.setClientID(setting);
}
setting = settings.getProperty(KEY_ASYNC_SEND);
if (setting != null && setting.length() > 0) {
factory.setUseAsyncSend(Boolean.parseBoolean(setting));
}
setting = settings.getProperty(KEY_ASYNC_DISPATCH);
if (setting != null && setting.length() > 0) {
factory.setAsyncDispatch(Boolean.parseBoolean(setting));
}
setting = settings.getProperty(KEY_ASYNC_SESSION);
if (setting != null && setting.length() > 0) {
factory.setAlwaysSessionAsync(Boolean.parseBoolean(setting));
}
setting = settings.getProperty(KEY_CLOSE_TIMEOUT);
if (setting != null && setting.length() > 0) {
factory.setCloseTimeout(Integer.parseInt(setting));
}
setting = settings.getProperty(KEY_COPY_MSG_ON_SEND);
if (setting != null && setting.length() > 0) {
factory.setCopyMessageOnSend(Boolean.parseBoolean(setting));
}
setting = settings.getProperty(KEY_DISABLE_TIMESTAMP);
if (setting != null && setting.length() > 0) {
factory.setDisableTimeStampsByDefault(Boolean.parseBoolean(setting));
}
setting = settings.getProperty(KEY_DEFER_OBJ_SERIAL);
if (setting != null && setting.length() > 0) {
factory.setObjectMessageSerializationDefered(Boolean.parseBoolean(setting));
}
setting = settings.getProperty(KEY_ON_SEND_PREP_MSG);
if (setting != null && setting.length() > 0) {
factory.setOnSendPrepareMessageBody(Boolean.parseBoolean(setting));
}
setting = settings.getProperty(KEY_OPTIM_ACK);
if (setting != null && setting.length() > 0) {
factory.setOptimizeAcknowledge(Boolean.parseBoolean(setting));
}
setting = settings.getProperty(KEY_OPTIM_DISPATCH);
if (setting != null && setting.length() > 0) {
factory.setOptimizedMessageDispatch(Boolean.parseBoolean(setting));
}
setting = settings.getProperty(KEY_PREFETCH_QUEUE);
if (setting != null && setting.length() > 0) {
factory.getPrefetchPolicy().setQueuePrefetch(Integer.parseInt(setting));
}
setting = settings.getProperty(KEY_PREFETCH_TOPIC);
if (setting != null && setting.length() > 0) {
factory.getPrefetchPolicy().setTopicPrefetch(Integer.parseInt(setting));
}
setting = settings.getProperty(KEY_USE_COMPRESSION);
if (setting != null && setting.length() > 0) {
factory.setUseCompression(Boolean.parseBoolean(setting));
}
setting = settings.getProperty(KEY_USE_RETROACTIVE);
if (setting != null && setting.length() > 0) {
factory.setUseRetroactiveConsumer(Boolean.parseBoolean(setting));
}
}
}

View File

@ -0,0 +1,37 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool.spi;
import org.apache.activemq.tool.ReflectionUtil;
import javax.jms.ConnectionFactory;
import java.util.Properties;
public abstract class ClassLoaderSPIConnectionFactory implements SPIConnectionFactory {
public ConnectionFactory createConnectionFactory(Properties settings) throws Exception {
Class factoryClass = Class.forName(getClassName());
ConnectionFactory factory = (ConnectionFactory)factoryClass.newInstance();
configureConnectionFactory(factory, settings);
return factory;
}
public void configureConnectionFactory(ConnectionFactory jmsFactory, Properties settings) throws Exception {
ReflectionUtil.configureClass(jmsFactory, settings);
}
public abstract String getClassName();
}

View File

@ -0,0 +1,25 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool.spi;
import javax.jms.ConnectionFactory;
import java.util.Properties;
public interface SPIConnectionFactory {
public ConnectionFactory createConnectionFactory(Properties settings) throws Exception;
public void configureConnectionFactory(ConnectionFactory jmsFactory, Properties settings) throws Exception;
}