This closes #36
This commit is contained in:
Timothy Bish 2014-07-29 18:05:32 -04:00
parent f55edcfa25
commit e47e0e0463
16 changed files with 598 additions and 232 deletions

View File

@ -76,5 +76,10 @@
<artifactId>slf4j-simple</artifactId>
<version>${slf4j-version}</version>
</dependency>
<!-- dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency -->
</dependencies>
</project>

View File

@ -19,8 +19,12 @@ package org.apache.activemq.tool;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import javax.jms.ConnectionFactory;
import javax.jms.ConnectionMetaData;
@ -35,6 +39,7 @@ import org.apache.activemq.tool.reports.PerformanceReportWriter;
import org.apache.activemq.tool.reports.VerbosePerfReportWriter;
import org.apache.activemq.tool.reports.XmlFilePerfReportWriter;
import org.apache.activemq.tool.sampler.CpuSamplerTask;
import org.apache.activemq.tool.sampler.PerformanceSampler;
import org.apache.activemq.tool.sampler.ThroughputSamplerTask;
import org.apache.activemq.tool.spi.SPIConnectionFactory;
import org.slf4j.Logger;
@ -49,7 +54,6 @@ public abstract class AbstractJmsClientSystem extends AbstractObjectProperties {
// Properties
protected JmsFactoryProperties factory = new JmsFactoryProperties();
protected ThroughputSamplerTask tpSampler = new ThroughputSamplerTask();
protected CpuSamplerTask cpuSampler = new CpuSamplerTask();
private int clientDestIndex;
private int clientDestCount;
@ -62,69 +66,97 @@ public abstract class AbstractJmsClientSystem extends AbstractObjectProperties {
// Create performance sampler
PerformanceReportWriter writer = createPerfWriter();
tpSampler.setPerfReportWriter(writer);
cpuSampler.setPerfReportWriter(writer);
writer.openReportWriter();
writer.writeProperties("jvmSettings", System.getProperties());
writer.writeProperties("testSystemSettings", ReflectionUtil.retrieveObjectProperties(getSysTest()));
writer.writeProperties("jmsFactorySettings", ReflectionUtil.retrieveObjectProperties(jmsConnFactory));
writer.writeProperties("jmsClientSettings", ReflectionUtil.retrieveObjectProperties(getJmsClientProperties()));
writer.writeProperties("tpSamplerSettings", ReflectionUtil.retrieveObjectProperties(tpSampler));
writer.writeProperties("cpuSamplerSettings", ReflectionUtil.retrieveObjectProperties(cpuSampler));
// set up performance samplers indicated by the user
List<PerformanceSampler> samplers = new ArrayList<>();
Set<String> requestedSamplers = getSysTest().getSamplersSet();
if (requestedSamplers.contains(JmsClientSystemProperties.SAMPLER_TP)) {
writer.writeProperties("tpSamplerSettings", ReflectionUtil.retrieveObjectProperties(tpSampler));
samplers.add(tpSampler);
}
if (requestedSamplers.contains(JmsClientSystemProperties.SAMPLER_CPU)) {
CpuSamplerTask cpuSampler = new CpuSamplerTask();
writer.writeProperties("cpuSamplerSettings", ReflectionUtil.retrieveObjectProperties(cpuSampler));
try {
cpuSampler.createPlugin();
samplers.add(cpuSampler);
} catch (IOException e) {
LOG.warn("Unable to start CPU sampler plugin. Reason: " + e.getMessage());
}
}
// spawn client threads
clientThreadGroup = new ThreadGroup(getSysTest().getClientPrefix() + " Thread Group");
for (int i = 0; i < getSysTest().getNumClients(); i++) {
distributeDestinations(getSysTest().getDestDistro(), i, getSysTest().getNumClients(), getSysTest().getTotalDests());
int numClients = getSysTest().getNumClients();
final CountDownLatch clientCompletionLatch = new CountDownLatch(numClients);
for (int i = 0; i < numClients; i++) {
distributeDestinations(getSysTest().getDestDistro(), i, numClients, getSysTest().getTotalDests());
final String clientName = getSysTest().getClientPrefix() + i;
final int clientDestIndex = this.clientDestIndex;
final int clientDestCount = this.clientDestCount;
Thread t = new Thread(clientThreadGroup, new Runnable() {
@Override
public void run() {
runJmsClient(clientName, clientDestIndex, clientDestCount);
LOG.info("Client completed");
clientCompletionLatch.countDown();
}
});
t.setName(getSysTest().getClientPrefix() + i + " Thread");
t.start();
}
// Run samplers
if (getSysTest().getSamplers().indexOf(JmsClientSystemProperties.SAMPLER_TP) > -1) {
tpSampler.startSampler();
// start the samplers
final CountDownLatch samplerCompletionLatch = new CountDownLatch(requestedSamplers.size());
for (PerformanceSampler sampler : samplers) {
sampler.setPerfReportWriter(writer);
sampler.startSampler(samplerCompletionLatch, getClientRunBasis(), getClientRunDuration());
}
if (getSysTest().getSamplers().indexOf(JmsClientSystemProperties.SAMPLER_CPU) > -1) {
try {
// wait for the clients to finish
clientCompletionLatch.await();
LOG.debug("All clients completed");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// if count-based, ramp-down time is not relevant, shut the samplers down
if (getClientRunBasis() == ClientRunBasis.count) {
for (PerformanceSampler sampler : samplers) {
sampler.finishSampling();
}
}
try {
cpuSampler.createPlugin();
cpuSampler.startSampler();
} catch (IOException e) {
LOG.warn("Unable to start CPU sampler plugin. Reason: " + e.getMessage());
LOG.debug("Waiting for samplers to shut down");
samplerCompletionLatch.await();
LOG.debug("All samplers completed");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writer.closeReportWriter();
}
}
tpSampler.waitUntilDone();
cpuSampler.waitUntilDone();
writer.closeReportWriter();
}
protected abstract ClientRunBasis getClientRunBasis();
protected abstract long getClientRunDuration();
public ThroughputSamplerTask getTpSampler() {
return tpSampler;
}
public void setTpSampler(ThroughputSamplerTask tpSampler) {
this.tpSampler = tpSampler;
}
public CpuSamplerTask getCpuSampler() {
return cpuSampler;
}
public void setCpuSampler(CpuSamplerTask cpuSampler) {
this.cpuSampler = cpuSampler;
}
public JmsFactoryProperties getFactory() {
return factory;
}
@ -204,7 +236,7 @@ public abstract class AbstractJmsClientSystem extends AbstractObjectProperties {
protected ConnectionFactory loadJmsFactory(String spiClass, Properties factorySettings) throws JMSException {
try {
Class spi = Class.forName(spiClass);
Class<?> spi = Class.forName(spiClass);
SPIConnectionFactory spiFactory = (SPIConnectionFactory)spi.newInstance();
ConnectionFactory jmsFactory = spiFactory.createConnectionFactory(factorySettings);
LOG.info("Created: " + jmsFactory.getClass().getName() + " using SPIConnectionFactory: " + spiFactory.getClass().getName());
@ -220,7 +252,7 @@ public abstract class AbstractJmsClientSystem extends AbstractObjectProperties {
props.setJmsVersion(metaData.getJMSVersion());
String jmsProperties = "";
Enumeration jmsProps = metaData.getJMSXPropertyNames();
Enumeration<?> jmsProps = metaData.getJMSXPropertyNames();
while (jmsProps.hasMoreElements()) {
jmsProperties += jmsProps.nextElement().toString() + ",";
}

View File

@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
public enum ClientRunBasis {
count, time;
}

View File

@ -83,11 +83,9 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
LOG.info("Starting to synchronously receive messages for " + duration + " ms...");
long endTime = System.currentTimeMillis() + duration;
int counter = 0;
while (System.currentTimeMillis() < endTime) {
getJmsConsumer().receive();
incThroughput();
counter++;
sleep();
commitTxIfNecessary();
}
@ -134,13 +132,14 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
}
getJmsConsumer().setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
incThroughput();
sleep();
try {
commitTxIfNecessary();
commitTxIfNecessary();
} catch (JMSException ex) {
LOG.error("Error committing transaction: " + ex.getMessage());
LOG.error("Error committing transaction: " + ex.getMessage());
}
}
});
@ -170,17 +169,18 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
final AtomicInteger recvCount = new AtomicInteger(0);
getJmsConsumer().setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
incThroughput();
recvCount.incrementAndGet();
synchronized (recvCount) {
recvCount.notify();
}
}
try {
commitTxIfNecessary();
commitTxIfNecessary();
} catch (JMSException ex) {
LOG.error("Error committing transaction: " + ex.getMessage());
LOG.error("Error committing transaction: " + ex.getMessage());
}
}
});
@ -209,11 +209,11 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
public MessageConsumer createJmsConsumer() throws JMSException {
Destination[] dest = createDestination(destIndex, destCount);
if (this.client.getMessageSelector() == null)
return createJmsConsumer(dest[0]);
else
return createJmsConsumer(dest[0], this.client.getMessageSelector(), false);
return createJmsConsumer(dest[0]);
else
return createJmsConsumer(dest[0], this.client.getMessageSelector(), false);
}
public MessageConsumer createJmsConsumer(Destination dest) throws JMSException {
@ -252,26 +252,28 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
return jmsConsumer;
}
@Override
public JmsClientProperties getClient() {
return client;
}
@Override
public void setClient(JmsClientProperties clientProps) {
client = (JmsConsumerProperties)clientProps;
}
/**
* A way to throttle the consumer. Time to sleep is
* configured via recvDelay property.
* A way to throttle the consumer. Time to sleep is
* configured via recvDelay property.
*/
protected void sleep() {
if (client.getRecvDelay() > 0) {
try {
LOG.trace("Sleeping for " + client.getRecvDelay() + " milliseconds");
Thread.sleep(client.getRecvDelay());
} catch (java.lang.InterruptedException ex) {
LOG.warn(ex.getMessage());
}
try {
LOG.trace("Sleeping for " + client.getRecvDelay() + " milliseconds");
Thread.sleep(client.getRecvDelay());
} catch (java.lang.InterruptedException ex) {
LOG.warn(ex.getMessage());
}
}
}
}

View File

@ -48,6 +48,17 @@ public class JmsConsumerSystem extends AbstractJmsClientSystem {
this.consumer = consumer;
}
@Override
protected ClientRunBasis getClientRunBasis() {
assert (consumer != null);
return ClientRunBasis.valueOf(consumer.getRecvType().toLowerCase());
}
@Override
protected long getClientRunDuration() {
return consumer.getRecvDuration();
}
protected void runJmsClient(String clientName, int clientDestIndex, int clientDestCount) {
ThroughputSamplerTask sampler = getTpSampler();

View File

@ -21,7 +21,6 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
@ -56,11 +55,12 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
public void sendMessages() throws JMSException {
// Send a specific number of messages
if (client.getSendType().equalsIgnoreCase(JmsProducerProperties.COUNT_BASED_SENDING)) {
sendCountBasedMessages(client.getSendCount());
long sendCount = client.getSendCount();
sendCountBasedMessages(sendCount);
// Send messages for a specific duration
} else {
sendTimeBasedMessages(client.getSendDuration());
long sendDuration = client.getSendDuration();
sendTimeBasedMessages(sendDuration);
}
}
@ -90,18 +90,18 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
try {
getConnection().start();
if (client.getMsgFileName() != null) {
LOG.info("Starting to publish " +
messageCount +
" messages from file " +
client.getMsgFileName()
);
LOG.info("Starting to publish " +
messageCount +
" messages from file " +
client.getMsgFileName()
);
} else {
LOG.info("Starting to publish " +
messageCount +
" messages of size " +
client.getMessageSize() +
" byte(s)."
);
LOG.info("Starting to publish " +
messageCount +
" messages of size " +
client.getMessageSize() +
" byte(s)."
);
}
// Send one type of message only, avoiding the creation of different messages on sending
@ -155,6 +155,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
}
}
} finally {
LOG.info("Finished sending");
getConnection().close();
}
}
@ -178,17 +179,17 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
try {
getConnection().start();
if (client.getMsgFileName() != null) {
LOG.info("Starting to publish messages from file " +
client.getMsgFileName() +
" for " +
duration +
" ms");
LOG.info("Starting to publish messages from file " +
client.getMsgFileName() +
" for " +
duration +
" ms");
} else {
LOG.info("Starting to publish " +
client.getMessageSize() +
" byte(s) messages for " +
duration +
" ms");
LOG.info("Starting to publish " +
client.getMessageSize() +
" byte(s) messages for " +
duration +
" ms");
}
// Send one type of message only, avoiding the creation of different messages on sending
if (!client.isCreateNewMsg()) {
@ -243,6 +244,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
}
}
} finally {
LOG.info("Finished sending");
getConnection().close();
}
}
@ -282,22 +284,22 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
}
public TextMessage createJmsTextMessage() throws JMSException {
if (client.getMsgFileName() != null) {
return loadJmsMessage();
} else {
if (client.getMsgFileName() != null) {
return loadJmsMessage();
} else {
return createJmsTextMessage(client.getMessageSize());
}
}
}
public TextMessage createJmsTextMessage(int size) throws JMSException {
jmsTextMessage = getSession().createTextMessage(buildText("", size));
// support for adding message headers
Set<String> headerKeys = this.client.getHeaderKeys();
for (String key : headerKeys) {
jmsTextMessage.setObjectProperty(key, this.client.getHeaderValue(key));
jmsTextMessage.setObjectProperty(key, this.client.getHeaderValue(key));
}
return jmsTextMessage;
}
@ -310,10 +312,12 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
return jmsTextMessage;
}
@Override
public JmsClientProperties getClient() {
return client;
}
@Override
public void setClient(JmsClientProperties clientProps) {
client = (JmsProducerProperties)clientProps;
}
@ -323,49 +327,49 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
Arrays.fill(data, (byte) 0);
return text + new String(data);
}
protected void sleep() {
if (client.getSendDelay() > 0) {
try {
LOG.trace("Sleeping for " + client.getSendDelay() + " milliseconds");
Thread.sleep(client.getSendDelay());
} catch (java.lang.InterruptedException ex) {
LOG.warn(ex.getMessage());
}
try {
LOG.trace("Sleeping for " + client.getSendDelay() + " milliseconds");
Thread.sleep(client.getSendDelay());
} catch (java.lang.InterruptedException ex) {
LOG.warn(ex.getMessage());
}
}
}
/**
* loads the message to be sent from the specified TextFile
*/
protected TextMessage loadJmsMessage() throws JMSException {
try {
// couple of sanity checks upfront
if (client.getMsgFileName() == null) {
throw new JMSException("Invalid filename specified.");
}
File f = new File(client.getMsgFileName());
if (f.isDirectory()) {
throw new JMSException("Cannot load from " +
client.getMsgFileName() +
" as it is a directory not a text file.");
}
// try to load file
BufferedReader br = new BufferedReader(new FileReader(f));
StringBuffer payload = new StringBuffer();
String tmp = null;
while ((tmp = br.readLine()) != null) {
payload.append(tmp);
}
jmsTextMessage = getSession().createTextMessage(payload.toString());
return jmsTextMessage;
} catch (FileNotFoundException ex) {
throw new JMSException(ex.getMessage());
} catch (IOException iox) {
throw new JMSException(iox.getMessage());
}
try {
// couple of sanity checks upfront
if (client.getMsgFileName() == null) {
throw new JMSException("Invalid filename specified.");
}
File f = new File(client.getMsgFileName());
if (f.isDirectory()) {
throw new JMSException("Cannot load from " +
client.getMsgFileName() +
" as it is a directory not a text file.");
}
// try to load file
BufferedReader br = new BufferedReader(new FileReader(f));
StringBuffer payload = new StringBuffer();
String tmp = null;
while ((tmp = br.readLine()) != null) {
payload.append(tmp);
}
br.close();
jmsTextMessage = getSession().createTextMessage(payload.toString());
return jmsTextMessage;
} catch (FileNotFoundException ex) {
throw new JMSException(ex.getMessage());
} catch (IOException iox) {
throw new JMSException(iox.getMessage());
}
}
}

View File

@ -28,14 +28,17 @@ public class JmsProducerSystem extends AbstractJmsClientSystem {
protected JmsProducerSystemProperties sysTest = new JmsProducerSystemProperties();
protected JmsProducerProperties producer = new JmsProducerProperties();
@Override
public JmsClientSystemProperties getSysTest() {
return sysTest;
}
@Override
public void setSysTest(JmsClientSystemProperties sysTestProps) {
sysTest = (JmsProducerSystemProperties)sysTestProps;
}
@Override
public JmsClientProperties getJmsClientProperties() {
return getProducer();
}
@ -48,6 +51,19 @@ public class JmsProducerSystem extends AbstractJmsClientSystem {
this.producer = producer;
}
@Override
protected ClientRunBasis getClientRunBasis() {
assert (producer != null);
return ClientRunBasis.valueOf(producer.getSendType().toLowerCase());
}
@Override
protected long getClientRunDuration() {
return producer.getSendDuration();
}
@Override
protected void runJmsClient(String clientName, int clientDestIndex, int clientDestCount) {
ThroughputSamplerTask sampler = getTpSampler();

View File

@ -17,8 +17,11 @@
package org.apache.activemq.tool.properties;
import java.io.File;
import java.util.HashSet;
import java.util.Set;
public class JmsClientSystemProperties extends AbstractObjectProperties {
public static final String DEST_DISTRO_ALL = "all"; // Each client will send/receive to all destination;
public static final String DEST_DISTRO_EQUAL = "equal"; // Equally divide the number of destinations to the number of clients
public static final String DEST_DISTRO_DIVIDE = "divide"; // Divide the destination among the clients, even if some have more destination than others
@ -71,6 +74,14 @@ public class JmsClientSystemProperties extends AbstractObjectProperties {
return samplers;
}
public Set<String> getSamplersSet() {
Set<String> samplersSet = new HashSet<>();
for (String sampler : samplers.split(",")) {
samplersSet.add(sampler.trim());
}
return samplersSet;
}
public void setSamplers(String samplers) {
this.samplers = samplers;
}

View File

@ -24,14 +24,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JmsProducerProperties extends JmsClientProperties {
private static final Logger LOG = LoggerFactory.getLogger(ReflectionUtil.class);
private static final Logger LOG = LoggerFactory.getLogger(ReflectionUtil.class);
public static final String TIME_BASED_SENDING = "time"; // Produce messages base on a time interval
public static final String COUNT_BASED_SENDING = "count"; // Produce a specific count of messages
public static final String DELIVERY_MODE_PERSISTENT = "persistent"; // Persistent message delivery
public static final String DELIVERY_MODE_NON_PERSISTENT = "nonpersistent"; // Non-persistent message delivery
protected String deliveryMode = DELIVERY_MODE_NON_PERSISTENT; // Message delivery mode
protected int messageSize = 1024; // Send 1kb messages by default
protected long sendCount = 1000000; // Send a million messages by default
@ -39,15 +39,15 @@ public class JmsProducerProperties extends JmsClientProperties {
protected String sendType = TIME_BASED_SENDING;
protected long sendDelay = 0; // delay in milliseconds between each producer send
protected String msgFileName = null; // for sending a particular msg from a file
protected Map<String,Object> headerMap = null;
// If true, create a different message on each send, otherwise reuse.
protected boolean createNewMsg;
protected boolean createNewMsg;
public JmsProducerProperties() {
this.headerMap = new HashMap();
this.headerMap = new HashMap<String, Object>();
}
public String getDeliveryMode() {
@ -97,72 +97,72 @@ public class JmsProducerProperties extends JmsClientProperties {
public void setCreateNewMsg(boolean createNewMsg) {
this.createNewMsg = createNewMsg;
}
public void setSendDelay(long delay) {
this.sendDelay = delay;
this.sendDelay = delay;
}
public long getSendDelay() {
return this.sendDelay;
return this.sendDelay;
}
/* Operations for supporting message headers */
/**
* Method for setting a message header.
* Method for setting a message header.
* @param encodedHeader - the header is encoded as a string using this syntax:
* encodedHeader = [headerkey '=' headervalue ':' ]*
* E.g. an encodedHeader could read "JMSType=car", or
* "JMSType=car:MyHeader=MyValue"
*
* That implies neither the header key nor the value
* That implies neither the header key nor the value
* can contain any of the characters ':' and '='.
*/
public void setHeader(String encodedHeader) {
// remove any trailing ':' characters
if (encodedHeader.endsWith(":")) {
encodedHeader = encodedHeader.substring(0, encodedHeader.length()-1);
}
// split headers
String headers[] = encodedHeader.split(":");
for (String h : headers) {
// split into header name and value
String tokens[] = h.split("=");
// sanity check, don't allow empty string for header names
if (tokens.length != 2 || tokens[0].equals("") || tokens[1].equals("") ) {
LOG.error("Error parsing message headers. Header: \"" + h +
"\". This header will be ignored.");
} else {
this.headerMap.put(tokens[0], tokens[1]);
}
}
// remove any trailing ':' characters
if (encodedHeader.endsWith(":")) {
encodedHeader = encodedHeader.substring(0, encodedHeader.length()-1);
}
// split headers
String headers[] = encodedHeader.split(":");
for (String h : headers) {
// split into header name and value
String tokens[] = h.split("=");
// sanity check, don't allow empty string for header names
if (tokens.length != 2 || tokens[0].equals("") || tokens[1].equals("") ) {
LOG.error("Error parsing message headers. Header: \"" + h +
"\". This header will be ignored.");
} else {
this.headerMap.put(tokens[0], tokens[1]);
}
}
}
public Set<String> getHeaderKeys() {
return this.headerMap.keySet();
return this.headerMap.keySet();
}
public Object getHeaderValue(String key) {
return this.headerMap.get(key);
}
return this.headerMap.get(key);
}
public void clearHeaders() {
this.headerMap.clear();
this.headerMap.clear();
}
public void setMsgFileName(String file) {
LOG.info("\"producer.msgFileName\" specified. " +
"Will ignore setting \"producer.messageSize\".");
this.msgFileName = file;
LOG.info("\"producer.msgFileName\" specified. " +
"Will ignore setting \"producer.messageSize\".");
this.msgFileName = file;
}
public String getMsgFileName() {
return this.msgFileName;
return this.msgFileName;
}
}

View File

@ -39,7 +39,7 @@ public final class ReflectionUtil {
String debugInfo;
Object target = obj;
Class targetClass = obj.getClass();
Class<?> targetClass = obj.getClass();
// DEBUG: Debugging Info
debugInfo = "Invoking: " + targetClass.getName();
@ -92,7 +92,7 @@ public final class ReflectionUtil {
if (setterMethod == null) {
throw new IllegalAccessException("Unable to find appropriate setter method signature for property: " + property);
}
Class paramType = setterMethod.getParameterTypes()[0];
Class<?> paramType = setterMethod.getParameterTypes()[0];
// Set primitive type
debugInfo += "." + setterMethod + "(" + paramType.getName() + ": " + val + ")";
@ -160,7 +160,7 @@ public final class ReflectionUtil {
}
public static void configureClass(Object obj, Properties props) {
for (Iterator i = props.keySet().iterator(); i.hasNext();) {
for (Iterator<Object> i = props.keySet().iterator(); i.hasNext();) {
try {
String key = (String)i.next();
String val = props.getProperty(key);

View File

@ -16,89 +16,173 @@
*/
package org.apache.activemq.tool.sampler;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.tool.ClientRunBasis;
import org.apache.activemq.tool.properties.AbstractObjectProperties;
import org.apache.activemq.tool.reports.PerformanceReportWriter;
import org.apache.commons.lang.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractPerformanceSampler extends AbstractObjectProperties implements PerformanceSampler {
protected long rampUpTime = 30 * 1000; // 30 secs
protected long rampDownTime = 30 * 1000; // 30 secs
protected long duration = 5 * 60 * 1000; // 5 mins
private final Logger log = LoggerFactory.getLogger(this.getClass());
protected long rampUpPercent = 0;
protected long rampDownPercent = 0;
// the following are all optionally set; they are otherwise worked out at run time
protected Long rampUpTime;
protected Long rampDownTime;
protected Long duration;
protected long interval = 1000; // 1 sec
protected PerformanceReportWriter perfReportWriter;
protected PerformanceEventListener perfEventListener;
protected final AtomicBoolean isRunning = new AtomicBoolean(false);
protected CountDownLatch completionLatch;
protected long sampleIndex;
public long getRampUpTime() {
@Override
public Long getRampUpTime() {
return rampUpTime;
}
@Override
public void setRampUpTime(long rampUpTime) {
this.rampUpTime = rampUpTime;
}
public long getRampDownTime() {
@Override
public Long getRampDownTime() {
return rampDownTime;
}
@Override
public void setRampDownTime(long rampDownTime) {
this.rampDownTime = rampDownTime;
}
public long getDuration() {
@Override
public Long getDuration() {
return duration;
}
@Override
public void setDuration(long duration) {
this.duration = duration;
}
@Override
public long getInterval() {
return interval;
}
@Override
public void setInterval(long interval) {
this.interval = interval;
}
@Override
public long getRampUpPercent() {
return rampUpPercent;
}
@Override
public void setRampUpPercent(long rampUpPercent) {
Validate.isTrue((rampUpPercent >= 0) && (rampUpPercent <= 100), "rampUpPercent must be a value between 0 and 100");
this.rampUpPercent = rampUpPercent;
}
@Override
public long getRampDownPercent() {
return rampDownPercent;
}
@Override
public void setRampDownPercent(long rampDownPercent) {
Validate.isTrue((rampDownPercent >= 0) && (rampDownPercent < 100), "rampDownPercent must be a value between 0 and 99");
this.rampDownPercent = rampDownPercent;
}
@Override
public PerformanceReportWriter getPerfReportWriter() {
return perfReportWriter;
}
@Override
public void setPerfReportWriter(PerformanceReportWriter perfReportWriter) {
this.perfReportWriter = perfReportWriter;
}
@Override
public PerformanceEventListener getPerfEventListener() {
return perfEventListener;
}
@Override
public void setPerfEventListener(PerformanceEventListener perfEventListener) {
this.perfEventListener = perfEventListener;
}
public void startSampler() {
isRunning.set(true);
@Override
public void startSampler(CountDownLatch completionLatch, ClientRunBasis clientRunBasis, long clientRunDuration) {
Validate.notNull(clientRunBasis);
Validate.notNull(completionLatch);
if (clientRunBasis == ClientRunBasis.time) {
// override the default durations
// if the user has overridden a duration, then use that
duration = (duration == null) ? clientRunDuration : this.duration;
rampUpTime = (rampUpTime == null) ? (duration / 100 * rampUpPercent) : this.rampUpTime;
rampDownTime = (rampDownTime == null) ? (duration / 100 * rampDownPercent) : this.rampDownTime;
Validate.isTrue(duration >= (rampUpTime + rampDownTime),
"Ramp times (up: " + rampDownTime + ", down: " + rampDownTime + ") exceed the sampler duration (" + duration + ")");
log.info("Sampling duration: {} ms, ramp up: {} ms, ramp down: {} ms", duration, rampUpTime, rampDownTime);
// spawn notifier thread to stop the sampler, taking ramp-down time into account
Thread notifier = new Thread(new RampDownNotifier(this));
notifier.setName("RampDownNotifier[" + this.getClass().getSimpleName() + "]");
notifier.start();
} else {
log.info("Performance test running on count basis; ignoring duration and ramp times");
setRampUpTime(0);
setRampDownTime(0);
}
this.completionLatch = completionLatch;
Thread t = new Thread(this);
t.setName(this.getClass().getSimpleName());
t.start();
isRunning.set(true);
}
@Override
public void finishSampling() {
isRunning.set(false);
}
@Override
public void run() {
try {
log.debug("Ramp up start");
onRampUpStart();
if (perfEventListener != null) {
perfEventListener.onRampUpStart(this);
}
try {
Thread.sleep(rampUpTime);
} catch (InterruptedException e) {
e.printStackTrace();
if (rampUpTime > 0) {
try {
Thread.sleep(rampUpTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("Sampler start");
onSamplerStart();
if (perfEventListener != null) {
perfEventListener.onSamplerStart(this);
@ -106,34 +190,32 @@ public abstract class AbstractPerformanceSampler extends AbstractObjectPropertie
sample();
log.debug("Sampler end");
onSamplerEnd();
if (perfEventListener != null) {
perfEventListener.onSamplerEnd(this);
}
try {
Thread.sleep(rampDownTime);
} catch (InterruptedException e) {
e.printStackTrace();
if (rampDownTime > 0) {
try {
Thread.sleep(rampDownTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("Ramp down end");
onRampDownEnd();
if (perfEventListener != null) {
perfEventListener.onRampDownEnd(this);
}
} finally {
isRunning.set(false);
synchronized (isRunning) {
isRunning.notifyAll();
}
completionLatch.countDown();
}
}
protected void sample() {
// Compute for the actual duration window of the sampler
long endTime = System.currentTimeMillis() + duration - rampDownTime - rampUpTime;
while (System.currentTimeMillis() < endTime) {
while (isRunning.get()) {
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
@ -144,24 +226,9 @@ public abstract class AbstractPerformanceSampler extends AbstractObjectPropertie
}
}
@Override
public abstract void sampleData();
public boolean isRunning() {
return isRunning.get();
}
public void waitUntilDone() {
while (isRunning()) {
try {
synchronized (isRunning) {
isRunning.wait(0);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// Call back functions to customize behavior of thread.
protected void onRampUpStart() {
}

View File

@ -16,19 +16,22 @@
*/
package org.apache.activemq.tool.sampler;
import java.util.concurrent.CountDownLatch;
import org.apache.activemq.tool.ClientRunBasis;
import org.apache.activemq.tool.reports.PerformanceReportWriter;
public interface PerformanceSampler extends Runnable {
long getRampUpTime();
Long getRampUpTime();
void setRampUpTime(long rampUpTime);
long getRampDownTime();
Long getRampDownTime();
void setRampDownTime(long rampDownTime);
long getDuration();
Long getDuration();
void setDuration(long duration);
@ -36,6 +39,14 @@ public interface PerformanceSampler extends Runnable {
void setInterval(long interval);
long getRampUpPercent();
void setRampUpPercent(long rampUpPercent);
long getRampDownPercent();
void setRampDownPercent(long rampDownPercent);
PerformanceReportWriter getPerfReportWriter();
void setPerfReportWriter(PerformanceReportWriter writer);
@ -44,9 +55,10 @@ public interface PerformanceSampler extends Runnable {
void setPerfEventListener(PerformanceEventListener listener);
void finishSampling();
void sampleData();
boolean isRunning();
void startSampler(CountDownLatch completionLatch, ClientRunBasis clientRunBasis, long clientRunDuration);
void waitUntilDone();
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool.sampler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RampDownNotifier implements Runnable {
private final static Logger LOG = LoggerFactory.getLogger(RampDownNotifier.class);
private final PerformanceSampler sampler;
public RampDownNotifier(PerformanceSampler sampler) {
this.sampler = sampler;
}
@Override
public void run() {
try {
Thread.sleep(sampler.getDuration() - sampler.getRampDownTime());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOG.debug("Ramping down sampler");
sampler.finishSampling();
}
}
}

View File

@ -16,23 +16,21 @@
*/
package org.apache.activemq.tool.sampler;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.tool.reports.AbstractPerfReportWriter;
public class ThroughputSamplerTask extends AbstractPerformanceSampler {
private final Object mutex = new Object();
private List<MeasurableClient> clients = new ArrayList<MeasurableClient>();
private final List<MeasurableClient> clients = new CopyOnWriteArrayList<>();
public void registerClient(MeasurableClient client) {
synchronized (mutex) {
clients.add(client);
}
clients.add(client);
}
@Override
public void sampleData() {
for (Iterator<MeasurableClient> i = clients.iterator(); i.hasNext();) {
MeasurableClient client = i.next();
@ -44,6 +42,7 @@ public class ThroughputSamplerTask extends AbstractPerformanceSampler {
}
}
@Override
protected void onSamplerStart() {
// Reset the throughput of the clients
for (Iterator<MeasurableClient> i = clients.iterator(); i.hasNext();) {

View File

@ -0,0 +1,29 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
#
# The logging properties used during tests..
#
log4j.rootLogger=INFO, stdout
log4j.logger.org.apache.activemq.tool=DEBUG
# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %m%n
#log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %-10.10X{activemq.broker} %-20.20X{activemq.connector} %-10.10X{activemq.destination} - %m%n

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool.sampler;
import org.apache.activemq.tool.ClientRunBasis;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.*;
public class AbstractPerformanceSamplerTest {
private class EmptySampler extends AbstractPerformanceSampler {
@Override
public void sampleData() {}
}
private AbstractPerformanceSampler sampler;
private CountDownLatch samplerLatch;
@Before
public void setUpSampler() {
sampler = new EmptySampler();
samplerLatch = new CountDownLatch(1);
}
@Test(expected = IllegalArgumentException.class)
public void testSetRampUpPercent_exceeds100() {
sampler.setRampUpPercent(101);
}
@Test(expected = IllegalArgumentException.class)
public void testSetRampUpPercent_lessThan0() {
sampler.setRampUpPercent(-1);
}
@Test(expected = IllegalArgumentException.class)
public void testSetRampDownPercent_exceeds99() {
sampler.setRampDownPercent(100);
}
@Test(expected = IllegalArgumentException.class)
public void testSetRampDownPercent_lessThan0() {
sampler.setRampDownPercent(-1);
}
@Test
public void testSamplerOnCountBasis() throws InterruptedException {
final CountDownLatch latch = samplerLatch;
sampler.startSampler(latch, ClientRunBasis.count, 0);
sampler.finishSampling();
samplerLatch.await();
assertNull(sampler.getDuration());
assertEquals(0, (long) sampler.getRampUpTime());
assertEquals(0, (long) sampler.getRampDownTime());
}
@Test
public void testSamplerOnTimeBasis_matchesClientSettings() throws InterruptedException {
final CountDownLatch latch = samplerLatch;
sampler.startSampler(latch, ClientRunBasis.time, 1000);
samplerLatch.await();
assertEquals(1000, (long) sampler.getDuration());
assertEquals(0, (long) sampler.getRampUpTime());
assertEquals(0, (long) sampler.getRampDownTime());
}
@Test
public void testSamplerOnTimeBasis_percentageOverrides() throws InterruptedException {
final CountDownLatch latch = samplerLatch;
sampler.setRampUpPercent(10);
sampler.setRampDownPercent(20);
sampler.startSampler(latch, ClientRunBasis.time, 1000);
samplerLatch.await();
assertEquals(1000, (long) sampler.getDuration());
assertEquals(100, (long) sampler.getRampUpTime());
assertEquals(200, (long) sampler.getRampDownTime());
}
@Test(expected = IllegalArgumentException.class)
public void testSamplerOnTimeBasis_percentageOverridesExceedSamplerDuration() throws InterruptedException {
final CountDownLatch latch = samplerLatch;
sampler.setRampUpPercent(60);
sampler.setRampDownPercent(41);
sampler.startSampler(latch, ClientRunBasis.time, 1000);
}
@Test
public void testSamplerOnTimeBasis_timeOverrides() throws InterruptedException {
final CountDownLatch latch = samplerLatch;
sampler.setRampUpTime(10);
sampler.setRampDownTime(20);
sampler.startSampler(latch, ClientRunBasis.time, 1000);
samplerLatch.await();
assertEquals(1000, (long) sampler.getDuration());
assertEquals(10, (long) sampler.getRampUpTime());
assertEquals(20, (long) sampler.getRampDownTime());
}
}