Added initial spike for support classes for performance measurable jms clients

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@410774 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Adrian T. Co 2006-06-01 06:30:28 +00:00
parent b36cf8c30b
commit e45af98761
7 changed files with 636 additions and 0 deletions

View File

@ -0,0 +1,63 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.ConnectionFactory;
import java.util.Map;
import java.lang.reflect.Constructor;
public class JmsBasicClientSupport {
private static final Log log = LogFactory.getLog(JmsBasicClientSupport.class);
public static final String DEFAULT_CONNECTION_FACTORY_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
public ConnectionFactory createConnectionFactory(String url) {
return createConnectionFactory(DEFAULT_CONNECTION_FACTORY_CLASS, url, null);
}
public ConnectionFactory createConnectionFactory(String clazz, String url) {
return createConnectionFactory(clazz, url, null);
}
public ConnectionFactory createConnectionFactory(String clazz, String url, Map props) {
if (clazz == null || clazz == "") {
throw new RuntimeException("No class definition specified to create connection factory.");
}
ConnectionFactory f = instantiateConnectionFactory(clazz, url);
if (props != null) {
ReflectionUtil.configureClass(f, props);
}
return f;
}
protected ConnectionFactory instantiateConnectionFactory(String clazz, String url) {
try {
Class factoryClass = Class.forName(clazz);
Constructor c = factoryClass.getConstructor(new Class[] {String.class});
ConnectionFactory factoryObj = (ConnectionFactory)c.newInstance(new Object[] {url});
return factoryObj;
} catch (Exception e) {
throw new RuntimeException (e);
}
}
}

View File

@ -0,0 +1,243 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.*;
import java.util.Map;
import java.util.HashMap;
public class JmsConfigurableClientSupport extends JmsBasicClientSupport {
private static final Log log = LogFactory.getLog(JmsConfigurableClientSupport.class);
public static final String AMQ_SERVER = "amq";
public static final String AMQ_CONNECTION_FACTORY_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
private String serverType = "";
private String factoryClass = "";
private Map factorySettings = new HashMap();
private Map connectionSettings = new HashMap();
private Map sessionSettings = new HashMap();
private Map queueSettings = new HashMap();
private Map topicSettings = new HashMap();
private Map consumerSettings = new HashMap();
private Map producerSettings = new HashMap();
private Map messageSettings = new HashMap();
protected ConnectionFactory jmsFactory = null;
protected Connection jmsConnection = null;
protected Session jmsSession = null;
protected MessageProducer jmsMessageProducer = null;
protected MessageConsumer jmsMessageConsumer = null;
public ConnectionFactory createConnectionFactory(String url) {
jmsFactory = super.createConnectionFactory(factoryClass, url, factorySettings);
return jmsFactory;
}
public ConnectionFactory createConnectionFactory(String clazz, String url) {
factoryClass = clazz;
jmsFactory = super.createConnectionFactory(clazz, url, factorySettings);
return jmsFactory;
}
public ConnectionFactory createConnectionFactory(String clazz, String url, Map props) {
factoryClass = clazz;
// Add previous settings to current settings
props.putAll(factorySettings);
jmsFactory = super.createConnectionFactory(clazz, url, props);
return jmsFactory;
}
public ConnectionFactory getConnectionFactory() {
return jmsFactory;
}
public Connection getConnection() throws JMSException {
if (jmsConnection == null) {
// Retrieve username and password parameter is they exist
String username = (String)connectionSettings.get("username");
String password = (String)connectionSettings.get("password");
if (username == null) {
username = "";
}
if (password == null) {
password = "";
}
jmsConnection = getConnectionFactory().createConnection(username, password);
configureJmsObject(jmsConnection, connectionSettings);
}
return jmsConnection;
}
public Session getSession() throws JMSException {
if (jmsSession == null) {
boolean transacted;
// Check if session is transacted
if (sessionSettings.get("transacted") != null && ((String)sessionSettings.get("transacted")).equals("true")) {
transacted = true;
} else {
transacted = false;
}
// Check acknowledge type - default is AUTO_ACKNOWLEDGE
String ackModeStr = (String)sessionSettings.get("ackMode");
int ackMode = Session.AUTO_ACKNOWLEDGE;
if (ackModeStr != null) {
if (ackModeStr.equalsIgnoreCase("CLIENT_ACKNOWLEDGE")) {
ackMode = Session.CLIENT_ACKNOWLEDGE;
} else if (ackModeStr.equalsIgnoreCase("DUPS_OK_ACKNOWLEDGE")) {
ackMode = Session.DUPS_OK_ACKNOWLEDGE;
} else if (ackModeStr.equalsIgnoreCase("SESSION_TRANSACTED")) {
ackMode = Session.SESSION_TRANSACTED;
}
}
jmsSession = getConnection().createSession(transacted, ackMode);
configureJmsObject(jmsSession, sessionSettings);
}
return jmsSession;
}
public MessageProducer createMessageProducer(Destination dest) throws JMSException {
jmsMessageProducer = getSession().createProducer(dest);
configureJmsObject(jmsMessageProducer, producerSettings);
return jmsMessageProducer;
}
public MessageProducer getMessageProducer() {
return jmsMessageProducer;
}
public MessageConsumer createMessageConsumer(Destination dest, String selector, boolean noLocal) throws JMSException {
jmsMessageConsumer = getSession().createConsumer(dest, selector, noLocal);
configureJmsObject(jmsMessageConsumer, consumerSettings);
return jmsMessageConsumer;
}
public MessageConsumer getMessageConsumer() {
return jmsMessageConsumer;
}
public TopicSubscriber createDurableSubscriber(Topic dest, String name, String selector, boolean noLocal) throws JMSException {
jmsMessageConsumer = getSession().createDurableSubscriber(dest, name, selector, noLocal);
configureJmsObject(jmsMessageConsumer, consumerSettings);
return (TopicSubscriber)jmsMessageConsumer;
}
public TopicSubscriber getDurableSubscriber() {
return (TopicSubscriber)jmsMessageConsumer;
}
public TextMessage createTextMessage(String text) throws JMSException {
TextMessage msg = getSession().createTextMessage(text);
configureJmsObject(msg, messageSettings);
return msg;
}
public Queue createQueue(String name) throws JMSException {
Queue queue = getSession().createQueue(name);
configureJmsObject(queue, queueSettings);
return queue;
}
public Topic createTopic(String name) throws JMSException {
Topic topic = getSession().createTopic(name);
configureJmsObject(topic, topicSettings);
return topic;
}
public void addConfigParam(String key, Object value) {
// Simple mapping of JMS Server to connection factory class
if (key.equalsIgnoreCase("server")) {
serverType = value.toString();
if (serverType.equalsIgnoreCase(AMQ_SERVER)) {
factoryClass = AMQ_CONNECTION_FACTORY_CLASS;
}
// Manually specify the connection factory class to use
} else if (key.equalsIgnoreCase("factoryClass")) {
factoryClass = value.toString();
// Connection URL to use
} else if (key.equalsIgnoreCase("url")) {
factoryUrl = value.toString();
// Connection factory specific settings
} else if (key.startsWith("factory.")) {
factorySettings.put(key.substring("factory.".length()), value);
// Connection specific settings
} else if (key.startsWith("connection.")) {
connectionSettings.put(key.substring("session.".length()), value);
// Session specific settings
} else if (key.startsWith("session.")) {
sessionSettings.put(key.substring("session.".length()), value);
// Destination specific settings
} else if (key.startsWith("dest.")) {
queueSettings.put(key.substring("dest.".length()), value);
topicSettings.put(key.substring("dest.".length()), value);
// Queue specific settings
} else if (key.startsWith("queue.")) {
queueSettings.put(key.substring("queue.".length()), value);
// Topic specific settings
} else if (key.startsWith("topic.")) {
topicSettings.put(key.substring("topic.".length()), value);
// Consumer specific settings
} else if (key.startsWith("consumer.")) {
consumerSettings.put(key.substring("consumer.".length()), value);
// Producer specific settings
} else if (key.startsWith("producer.")) {
producerSettings.put(key.substring("producer.".length()), value);
// Message specific settings
} else if (key.startsWith("message.")) {
messageSettings.put(key.substring("message.".length()), value);
// Unknown settings
} else {
log.warn("Unknown setting: " + key + " = " + value);
}
}
public void configureJmsObject(Object jmsObject, Map props) {
if (props == null || props.isEmpty()) {
return;
}
ReflectionUtil.configureClass(jmsObject, props);
}
public void configureJmsObject(Object jmsObject, String key, Object val) {
if (key == null || key == "" || val == null) {
return;
}
ReflectionUtil.configureClass(jmsObject, key, val);
}
}

View File

@ -0,0 +1,84 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
public class JmsPerfClientSupport extends JmsConfigurableClientSupport implements PerfMeasurable {
protected long throughput = 0;
protected long interval = 1000; // 1 sec
protected long duration = 1000 * 60 * 10; // 10 min
protected long rampUpTime = 1000 * 60 * 1; // 1 min
protected long rampDownTime = 1000 * 60 * 1; // 1 min
protected PerfEventListener listener = null;
public long getThroughput() {
return throughput;
}
public void setThroughput(long val) {
this.throughput = val;
}
public void incThroughput() {
throughput++;
}
public void incThroughput(long val) {
throughput += val;
}
public long getInterval() {
return interval;
}
public void setInterval(long val) {
this.interval = val;
}
public long getDuration() {
return duration;
}
public void setDuration(long val) {
this.duration = val;
}
public long getRampUpTime() {
return rampUpTime;
}
public void setRampUpTime(long val) {
this.rampUpTime = val;
}
public long getRampDownTime() {
return rampDownTime;
}
public void setRampDownTime(long val) {
this.rampDownTime = val;
}
public void setPerfEventListener(PerfEventListener listener) {
this.listener = listener;
}
public PerfEventListener getPerfEventListener() {
return listener;
}
}

View File

@ -0,0 +1,45 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import javax.jms.JMSException;
public class PerfEventAdapter implements PerfEventListener {
public void onConfigStart() {
}
public void onConfigEnd() {
}
public void onPublishStart() {
}
public void onPublishEnd() {
}
public void onConsumeStart() {
}
public void onConsumeEnd() {
}
public void onJMSException(JMSException e) {
}
public void onException(Exception e) {
}
}

View File

@ -0,0 +1,30 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import javax.jms.JMSException;
public interface PerfEventListener {
public void onConfigStart();
public void onConfigEnd();
public void onPublishStart();
public void onPublishEnd();
public void onConsumeStart();
public void onConsumeEnd();
public void onJMSException(JMSException e);
public void onException(Exception e);
}

View File

@ -0,0 +1,27 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
public interface PerfMeasurable {
public long getThroughput();
public long getInterval();
public long getDuration();
public long getRampUpTime();
public long getRampDownTime();
public void setPerfEventListener(PerfEventListener listener);
public PerfEventListener getPerfEventListener();
}

View File

@ -0,0 +1,144 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map;
import java.util.Iterator;
import java.util.StringTokenizer;
import java.lang.reflect.Method;
public final class ReflectionUtil {
private static final Log log = LogFactory.getLog(ReflectionUtil.class);
private ReflectionUtil() {
}
public static void configureClass(Object obj, String key, Object val) {
try {
Object target = obj;
Class targetClass = obj.getClass();
//System.out.print("Invoking: " + targetClass);
StringTokenizer tokenizer = new StringTokenizer(key, ".");
for (int j=0; j<tokenizer.countTokens()-1; j++) {
// Find getter method first
String name = tokenizer.nextToken();
String getMethod = "get" + name.substring(0,1).toUpperCase() + name.substring(1);
Method method = targetClass.getMethod(getMethod, new Class[] {});
target = method.invoke(target, null);
targetClass = target.getClass();
//System.out.print("." + getMethod + "()");
}
// Invoke setter method of last class
String name = tokenizer.nextToken();
String methodName = "set" + name.substring(0,1).toUpperCase() + name.substring(1);
Method method = getPrimitiveMethod(targetClass, methodName, val);
method.invoke(target, val);
//System.out.println("." + methodName + "(" + val + ")");
} catch (Exception e) {
log.warn("", e);
}
}
public static void configureClass(Object obj, Map props) {
for (Iterator i = props.keySet().iterator(); i.hasNext();) {
String key = (String)i.next();
Object val = props.get(key);
configureClass(obj, key, val);
}
}
private static Method getPrimitiveMethod(Class objClass, String methodName, Object param) throws NoSuchMethodException {
if (param instanceof Boolean) {
try {
// Try using the primitive type first
return objClass.getMethod(methodName, new Class[] {boolean.class});
} catch (NoSuchMethodException e) {
// Try using the wrapper class next
return objClass.getMethod(methodName, new Class[] {Boolean.class});
}
} else if (param instanceof Integer) {
try {
// Try using the primitive type first
return objClass.getMethod(methodName, new Class[] {int.class});
} catch (NoSuchMethodException e) {
// Try using the wrapper class next
return objClass.getMethod(methodName, new Class[] {Integer.class});
}
} else if (param instanceof Long) {
try {
// Try using the primitive type first
return objClass.getMethod(methodName, new Class[] {long.class});
} catch (NoSuchMethodException e) {
// Try using the wrapper class next
return objClass.getMethod(methodName, new Class[] {Long.class});
}
} else if (param instanceof Short) {
try {
// Try using the primitive type first
return objClass.getMethod(methodName, new Class[] {short.class});
} catch (NoSuchMethodException e) {
// Try using the wrapper class next
return objClass.getMethod(methodName, new Class[] {Short.class});
}
} else if (param instanceof Byte) {
try {
// Try using the primitive type first
return objClass.getMethod(methodName, new Class[] {byte.class});
} catch (NoSuchMethodException e) {
// Try using the wrapper class next
return objClass.getMethod(methodName, new Class[] {Byte.class});
}
} else if (param instanceof Character) {
try {
// Try using the primitive type first
return objClass.getMethod(methodName, new Class[] {char.class});
} catch (NoSuchMethodException e) {
// Try using the wrapper class next
return objClass.getMethod(methodName, new Class[] {Character.class});
}
} else if (param instanceof Double) {
try {
// Try using the primitive type first
return objClass.getMethod(methodName, new Class[] {double.class});
} catch (NoSuchMethodException e) {
// Try using the wrapper class next
return objClass.getMethod(methodName, new Class[] {Double.class});
}
} else if (param instanceof Float) {
try {
// Try using the primitive type first
return objClass.getMethod(methodName, new Class[] {float.class});
} catch (NoSuchMethodException e) {
// Try using the wrapper class next
return objClass.getMethod(methodName, new Class[] {Float.class});
}
} else {
// parameter is not a primitive
return objClass.getMethod(methodName, new Class[] {param.getClass()});
}
}
}