systest to check for memory leak

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@412933 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonas B. Lim 2006-06-09 05:18:11 +00:00
parent f0d409fec6
commit 2054392af2
9 changed files with 1283 additions and 0 deletions

View File

@ -0,0 +1,66 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>incubator-activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>4.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>incubator-activemq</groupId>
<artifactId>maven-activemq-memtest-plugin</artifactId>
<packaging>maven-plugin</packaging>
<name>ActiveMQ :: Memory Usage Test Plugin</name>
<dependencies>
<dependency>
<groupId>org.apache.maven</groupId>
<artifactId>maven-plugin-api</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>incubator-activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>4.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>incubator-activemq</groupId>
<artifactId>activemq-console</artifactId>
<version>4.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>incubator-activemq</groupId>
<artifactId>activeio-core</artifactId>
<version>3.0-beta2</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbynet</artifactId>
<version>10.1.1.0</version>
</dependency>
<dependency>
<groupId>backport-util-concurrent</groupId>
<artifactId>backport-util-concurrent</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-j2ee-management_1.0_spec</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,99 @@
####################################################################################################
# Running Maven 2 Memory usage Test
####################################################################################################
Goal | Description
--------------------------|----------------------------------------------------------
activemq-memtest:memtest | Starts the broker, producer, consumer and the memory monitoring thread all in the same VM and
| generate the heap and non-heap memory usage of the jvm.
| The plugin is included by default in the \activemq-perf module.
|
| Parameters :
|
| 1. -DmessageCount - specifies number of messages to send/receive
| - default value : 100000
|
| 2. -Dtopic - specifies domain type. Valid value is true or false
| - default value : true
|
| 3. -Ddurable - specifies delivery mode: Valid value is true or false
| - default value : false
|
| 4. -DconnectionCheckpointSize - specifies size of messages sent in KB before we close and
| start the producer/consumer to see if there is a memory
| leak using different connections.
| - a value of -1 indicates that no checkpoint is set and will
| send/consume messages using one producer/consumer conneciton
| - default value : -1
|
| 5. -DmessageSize - specifies the message size in bytes
| - default value : 10240 (10KB)
|
| 6. -DcheckpointInterval - specifies the interval in seconds on which the monitoring tool
| will get the memory usage of test run.
| - default value : 2 (seconds)
|
| 7. -DprefetchSize - specifies the prefetch size to be used
| - a value of -1 will indicates that test will use the default prefetch
| size (32000)
| - default value : -1
|
| 8. -Durl - species the broker url to use if not going to be using the embedded broker
| - default value : null
|
| 9. -DreportName - specifies the name of the output xml file.
| - default value : activemq-memory-usage-report
|
| 10. -DreportDirectory - specifies the directory of the output file
| - default value : ${project.build.directory}/test-memtest
|
| 11. -DproducerCount - specifies the number of producers
| - default value : 1
|
| 12. -DconsumerCount - specifies the number of consumers
| - default value : 1
-----------------------------------------------------------------------------------------------
|Memory Usage Test sample output
|-----------------------------------------------------------------------------------------------
|<test-report>
| <test-information>
| <os-name>Windows XP</os-name>
| <java-version>1.5.0_05</java-version>
| <jvm_memory_settings>
| <heap_memory>
| <committed>9502720</committed>
| <max>66650112</max>
| </heap_memory>
| <non_heap_memory>
| <committed>30736384</committed>
| <max>121634816</max>
| </non_heap_memory>
| </jvm_memory_settings>
| <test-settings>
| <durable>non-durable</durable>
| <message_size>10240</message_size>
| <destination_name>FOO.BAR</destination_name>
| <connection_checkpoint_size>-1</connection_checkpoint_size>
| <consumer_count>1</consumer_count>
| <report_name>activemq-memory-usage-report</report_name>
| <prefetchSize>-1</prefetchSize>
| <domain>topic</domain>
| <producer_count>1</producer_count>
| <connection_checkpoint_size_kb>-1</connection_checkpoint_size_kb>
| <message_count>100000</message_count>
| <report_directory>C:\Projects\logicblaze\activemq\activemq-perftest\target/test-memtest</report_directory>
| </test-settings>
| </test-information>
| <test-result checkpoint_interval_in_sec=5 >
| <memory_usage index=0 non_heap_mb=21 non_heap_bytes=22963904 heap_mb=6 heap_bytes=7275808/>
| <memory_usage index=1 non_heap_mb=23 non_heap_bytes=24598560 heap_mb=11 heap_bytes=12474400/>
| ....
| ....
| </test-result>
|</test-report>
|
-------------------------------------------------------------------------------------------------

View File

@ -0,0 +1,145 @@
package org.apache.activemq.maven;
/*
* Copyright 2001-2005 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.maven.plugin.AbstractMojo;
import org.apache.maven.plugin.MojoExecutionException;
import org.apache.activemq.tool.JMSMemtest;
import javax.jms.JMSException;
/**
* Goal which does a memory usage test to check for any memory leak
*
* @goal memtest
* @phase process-sources
*/
public class MemtestMojo
extends AbstractMojo {
/**
* @parameter expression="${url}
*
*/
private String url;
/**
* @parameter expression="${topic}" default-value="true"
* @required
*/
private String topic;
/**
* @parameter expression="${connectionCheckpointSize}" default-value="-1"
* @required
*/
private String connectionCheckpointSize;
/**
* @parameter expression="${durable}" default-value="false"
* @required
*/
private String durable;
/**
* @parameter expression="${producerCount}" default-value="1"
* @required
*/
private String producerCount;
/**
* @parameter expression="${prefetchSize}" default-value="-1"
* @required
*/
private String prefetchSize;
/**
* @parameter expression="${consumerCount}" default-value="1"
* @required
*/
private String consumerCount;
/**
* @parameter expression="${messageCount}" default-value="100000"
* @required
*/
private String messageCount;
/**
* @parameter expression="${messageSize}" default-value="10240"
* @required
*/
private String messageSize;
/**
* @parameter expression="${checkpointInterval}" default-value="2"
* @required
*/
private String checkpointInterval;
/**
* @parameter expression="${destinationName}" default-value="FOO.BAR"
* @required
*/
private String destinationName;
/**
* @parameter expression="${reportName}" default-value="activemq-memory-usage-report"
* @required
*/
private String reportName;
/**
* @parameter expression="${reportDirectory}" default-value="${project.build.directory}/test-memtest"
* @required
*/
private String reportDirectory;
public void execute()
throws MojoExecutionException {
JMSMemtest.main(createArgument());
}
public String[] createArgument() {
String[] options = {
"url=" + url,
"topic=" + topic,
"durable=" + durable,
"connectionCheckpointSize=" + connectionCheckpointSize,
"producerCount=" + producerCount,
"consumerCount=" + consumerCount,
"messageCount=" + messageCount,
"messageSize=" + messageSize,
"checkpointInterval=" + checkpointInterval,
"destinationName=" + destinationName,
"reportName=" + reportName,
"prefetchSize=" + prefetchSize,
"reportDirectory=" + reportDirectory,
};
return options;
}
}

View File

@ -0,0 +1,307 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.tool.MemProducer;
import org.apache.activemq.tool.MemConsumer;
import org.apache.activemq.tool.MemoryMonitoringTool;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.*;
import java.util.Properties;
public class JMSMemtest {
private static final Log log = LogFactory.getLog(JMSMemtest.class);
private static final int DEFAULT_MESSAGECOUNT = 5000;
protected BrokerService broker;
protected boolean topic = true;
protected boolean durable = false;
protected long messageCount = 0;
// how large the message in kb before we close/start the producer/consumer with a new connection. -1 means no connectionCheckpointSize
protected int connectionCheckpointSize;
protected long connectionInterval;
protected int consumerCount;
protected int producerCount;
protected int checkpointInterval;
protected int prefetchSize;
//set 10 kb of payload as default
protected int messageSize;
protected String reportDirectory;
protected String reportName;
protected String url = "";
protected MemProducer[] producers;
protected MemConsumer[] consumers;
protected String destinationName;
protected boolean allMessagesConsumed = true;
protected MemConsumer allMessagesList = new MemConsumer();
protected Message payload;
protected ActiveMQConnectionFactory connectionFactory;
protected Connection connection;
protected Destination destination;
protected boolean createConnectionPerClient = true;
protected boolean transacted = false;
protected boolean useEmbeddedBroker = true;
protected MemoryMonitoringTool memoryMonitoringTool;
public static void main(String[] args) {
Properties sysSettings = new Properties();
for (int i = 0; i < args.length; i++) {
int index = args[i].indexOf("=");
String key = args[i].substring(0, index);
String val = args[i].substring(index + 1);
sysSettings.setProperty(key, val);
}
JMSMemtest memtest = new JMSMemtest(sysSettings);
try {
memtest.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public JMSMemtest(Properties settings) {
url = settings.getProperty("url");
topic = new Boolean(settings.getProperty("topic")).booleanValue();
durable = new Boolean(settings.getProperty("durable")).booleanValue();
connectionCheckpointSize = new Integer(settings.getProperty("connectionCheckpointSize")).intValue();
producerCount = new Integer(settings.getProperty("producerCount")).intValue();
consumerCount = new Integer(settings.getProperty("consumerCount")).intValue();
messageCount = new Integer(settings.getProperty("messageCount")).intValue();
messageSize = new Integer(settings.getProperty("messageSize")).intValue();
prefetchSize = new Integer(settings.getProperty("prefetchSize")).intValue();
checkpointInterval = new Integer(settings.getProperty("checkpointInterval")).intValue() * 1000;
producerCount = new Integer(settings.getProperty("producerCount")).intValue();
reportName = settings.getProperty("reportName");
destinationName = settings.getProperty("destinationName");
reportDirectory = settings.getProperty("reportDirectory");
connectionInterval = connectionCheckpointSize * 1024;
}
protected void start() throws Exception {
log.info("Starting Monitor");
memoryMonitoringTool = new MemoryMonitoringTool();
memoryMonitoringTool.setTestSettings(getSysTestSettings());
Thread monitorThread = memoryMonitoringTool.startMonitor();
if (messageCount == 0) {
messageCount = DEFAULT_MESSAGECOUNT;
}
if (useEmbeddedBroker) {
if (broker == null) {
broker = createBroker();
}
}
connectionFactory = (ActiveMQConnectionFactory) createConnectionFactory();
if (prefetchSize > 0) {
connectionFactory.getPrefetchPolicy().setTopicPrefetch(prefetchSize);
connectionFactory.getPrefetchPolicy().setQueuePrefetch(prefetchSize);
}
connection = connectionFactory.createConnection();
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
if (topic) {
destination = session.createTopic(destinationName);
} else {
destination = session.createQueue(destinationName);
}
createPayload(session);
publishAndConsume();
log.info("Closing resources");
this.close();
monitorThread.join();
}
protected boolean resetConnection(int counter) {
if (connectionInterval > 0) {
long totalMsgSizeConsumed = counter * 1024;
if (connectionInterval < totalMsgSizeConsumed) {
return true;
}
}
return false;
}
protected void publishAndConsume() throws Exception {
createConsumers();
createProducers();
int counter = 0;
boolean resetCon = false;
log.info("Start sending messages ");
for (int i = 0; i < messageCount; i++) {
if (resetCon == true) {
closeConsumers();
closeProducers();
createConsumers();
createProducers();
resetCon = false;
}
for (int k = 0; k < producers.length; k++) {
producers[k].sendMessage(payload, "counter", counter);
counter++;
if (resetConnection(counter)) {
resetCon = true;
break;
}
}
}
}
protected void close() throws Exception {
connection.close();
broker.stop();
memoryMonitoringTool.stopMonitor();
}
protected void createPayload(Session session) throws JMSException {
byte[] array = new byte[messageSize];
for (int i = 0; i < array.length; i++) {
array[i] = (byte) i;
}
BytesMessage bystePayload = session.createBytesMessage();
bystePayload.writeBytes(array);
payload = (Message) bystePayload;
}
protected void createProducers() throws JMSException {
producers = new MemProducer[producerCount];
for (int i = 0; i < producerCount; i++) {
producers[i] = new MemProducer(connectionFactory, destination);
if (durable) {
producers[i].setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
producers[i].setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
producers[i].start();
}
}
protected void createConsumers() throws JMSException {
consumers = new MemConsumer[consumerCount];
for (int i = 0; i < consumerCount; i++) {
consumers[i] = new MemConsumer(connectionFactory, destination);
consumers[i].setParent(allMessagesList);
consumers[i].start();
}
}
protected void closeProducers() throws JMSException {
for (int i = 0; i < producerCount; i++) {
producers[i].shutDown();
}
}
protected void closeConsumers() throws JMSException {
for (int i = 0; i < consumerCount; i++) {
consumers[i].shutDown();
}
}
protected ConnectionFactory createConnectionFactory() throws JMSException {
if (url == null || url.trim().equals("") || url.trim().equals("null")) {
return new ActiveMQConnectionFactory("vm://localhost");
} else {
return new ActiveMQConnectionFactory(url);
}
}
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
configureBroker(broker);
broker.start();
return broker;
}
protected void configureBroker(BrokerService broker) throws Exception {
broker.addConnector("vm://localhost");
broker.setDeleteAllMessagesOnStartup(true);
}
protected Properties getSysTestSettings() {
Properties settings = new Properties();
settings.setProperty("domain", topic == true ? "topic" : "queue");
settings.setProperty("durable", durable == true ? "durable" : "non-durable");
settings.setProperty("connection_checkpoint_size_kb", new Integer(connectionCheckpointSize).toString());
settings.setProperty("producer_count", new Integer(producerCount).toString());
settings.setProperty("consumer_count", new Integer(consumerCount).toString());
settings.setProperty("message_count", new Long(messageCount).toString());
settings.setProperty("message_size", new Integer(messageSize).toString());
settings.setProperty("prefetchSize", new Integer(prefetchSize).toString());
settings.setProperty("checkpoint_interval", new Integer(checkpointInterval).toString());
settings.setProperty("destination_name", destinationName);
settings.setProperty("report_name", reportName);
settings.setProperty("report_directory", reportDirectory);
settings.setProperty("connection_checkpoint_size", new Integer(connectionCheckpointSize).toString());
return settings;
}
}

View File

@ -0,0 +1,108 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import org.apache.activemq.tool.MemMessageIdList;
import javax.jms.*;
/**
* @version $Revision: 1.3 $
*/
public class MemConsumer extends MemMessageIdList implements MessageListener {
protected Connection connection;
protected MessageConsumer consumer;
protected long counter = 0;
protected boolean isParent = false;
protected boolean inOrder = true;
public MemConsumer() {
super();
}
public MemConsumer(ConnectionFactory fac, Destination dest, String consumerName) throws JMSException {
connection = fac.createConnection();
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
if (dest instanceof Topic && consumerName != null && consumerName.length() > 0) {
consumer = s.createDurableSubscriber((Topic) dest, consumerName);
} else {
consumer = s.createConsumer(dest);
}
consumer.setMessageListener(this);
}
public MemConsumer(ConnectionFactory fac, Destination dest) throws JMSException {
this(fac, dest, null);
}
public void start() throws JMSException {
connection.start();
}
public void stop() throws JMSException {
connection.stop();
}
public void shutDown() throws JMSException {
connection.close();
}
public Message receive() throws JMSException {
return consumer.receive();
}
public Message receive(long wait) throws JMSException {
return consumer.receive(wait);
}
static long ctr = 0;
public void onMessage(Message msg) {
super.onMessage(msg);
if (isParent) {
try {
long ctr = msg.getLongProperty("counter");
if (counter != ctr) {
inOrder = false;
}
counter++;
} catch (Exception e) {
e.printStackTrace();
}
}
}
public boolean isInOrder() {
return inOrder;
}
public void setAsParent(boolean isParent) {
this.isParent = isParent;
}
public boolean isParent() {
return this.isParent;
}
}

View File

@ -0,0 +1,177 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import java.util.ArrayList;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A simple container of messages for performing testing and rendezvous style
* code. You can use this class a {@link MessageListener} and then make
* assertions about how many messages it has received allowing a certain maximum
* amount of time to ensure that the test does not hang forever.
* <p/>
* Also you can chain these instances together with the
* {@link #setParent(MessageListener)} method so that you can aggregate the
* total number of messages consumed across a number of consumers.
*
* @version $Revision: 1.6 $
*/
public class MemMessageIdList implements MessageListener {
protected static final Log log = LogFactory.getLog(MemMessageIdList.class);
private List messageIds = new ArrayList();
private Object semaphore;
private boolean verbose;
private MessageListener parent;
private long maximumDuration = 15000L;
public MemMessageIdList() {
this(new Object());
}
public MemMessageIdList(Object semaphore) {
this.semaphore = semaphore;
}
public boolean equals(Object that) {
if (that instanceof MemMessageIdList) {
MemMessageIdList thatListMem = (MemMessageIdList) that;
return getMessageIds().equals(thatListMem.getMessageIds());
}
return false;
}
public int hashCode() {
synchronized (semaphore) {
return messageIds.hashCode() + 1;
}
}
public String toString() {
synchronized (semaphore) {
return messageIds.toString();
}
}
/**
* @return all the messages on the list so far, clearing the buffer
*/
public List flushMessages() {
synchronized (semaphore) {
List answer = new ArrayList(messageIds);
messageIds.clear();
return answer;
}
}
public synchronized List getMessageIds() {
synchronized (semaphore) {
return new ArrayList(messageIds);
}
}
public void onMessage(Message message) {
String id = null;
try {
id = message.getJMSMessageID();
synchronized (semaphore) {
messageIds.add(id);
semaphore.notifyAll();
}
if (verbose) {
log.info("Received message: " + message);
}
} catch (JMSException e) {
e.printStackTrace();
}
if (parent != null) {
parent.onMessage(message);
}
}
public int getMessageCount() {
synchronized (semaphore) {
return messageIds.size();
}
}
public void waitForMessagesToArrive(int messageCount) {
log.info("Waiting for " + messageCount + " message(s) to arrive");
long start = System.currentTimeMillis();
for (int i = 0; i < messageCount; i++) {
try {
if (hasReceivedMessages(messageCount)) {
break;
}
long duration = System.currentTimeMillis() - start;
if (duration >= maximumDuration) {
break;
}
synchronized (semaphore) {
semaphore.wait(maximumDuration - duration);
}
} catch (InterruptedException e) {
log.info("Caught: " + e);
}
}
long end = System.currentTimeMillis() - start;
log.info("End of wait for " + end + " millis and received: " + getMessageCount() + " messages");
}
public boolean hasReceivedMessage() {
return getMessageCount() == 0;
}
public boolean hasReceivedMessages(int messageCount) {
return getMessageCount() >= messageCount;
}
public boolean isVerbose() {
return verbose;
}
public void setVerbose(boolean verbose) {
this.verbose = verbose;
}
public MessageListener getParent() {
return parent;
}
/**
* Allows a parent listener to be specified such as to aggregate messages
* consumed across consumers
*/
public void setParent(MessageListener parent) {
this.parent = parent;
}
}

View File

@ -0,0 +1,74 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
/**
* @version $Revision: 1.3 $
*/
public class MemProducer {
protected Connection connection;
protected MessageProducer producer;
public MemProducer(ConnectionFactory fac, Destination dest) throws JMSException {
connection = fac.createConnection();
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = s.createProducer(dest);
}
public void setDeliveryMode(int mode) throws JMSException {
producer.setDeliveryMode(mode);
}
public void start() throws JMSException {
connection.start();
}
public void stop() throws JMSException {
connection.stop();
}
public void shutDown() throws JMSException {
connection.close();
}
public void sendMessage(Message msg) throws JMSException {
sendMessage(msg, null, 0);
}
/*
* allow producer to attach message counter on its header. This will be used to verify message order
*
*/
public void sendMessage(Message msg, String headerName, long headerValue) throws JMSException {
if (headerName != null) {
msg.setLongProperty(headerName, headerValue);
}
producer.send(msg);
}
}

View File

@ -0,0 +1,148 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import java.io.DataOutputStream;
import java.util.Properties;
import java.lang.management.MemoryMXBean;
import java.lang.management.ManagementFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MemoryMonitoringTool implements Runnable {
private long checkpointInterval = 5000; // 5 sec sample checkpointInterval
private long resultIndex = 0;
private AtomicBoolean isRunning = new AtomicBoolean(false);
private DataOutputStream dataDoutputStream = null;
protected Properties testSettings = new Properties();
protected ReportGenerator reportGenerator = new ReportGenerator();
private MemoryMXBean memoryBean;
public Properties getTestSettings() {
return testSettings;
}
public void setTestSettings(Properties sysTestSettings) {
this.testSettings = sysTestSettings;
}
public DataOutputStream getDataOutputStream() {
return dataDoutputStream;
}
public void setDataOutputStream(DataOutputStream dataDoutputStream) {
this.dataDoutputStream = dataDoutputStream;
}
public void stopMonitor() {
isRunning.set(false);
}
public long getCheckpointInterval() {
return checkpointInterval;
}
public void setCheckpointInterval(long checkpointInterval) {
this.checkpointInterval = checkpointInterval;
}
public Thread startMonitor() {
String intervalStr = this.getTestSettings().getProperty("checkpoint_interval");
checkpointInterval = new Integer(intervalStr).intValue();
this.getTestSettings().remove("checkpoint_interval");
memoryBean = ManagementFactory.getMemoryMXBean();
reportGenerator.setTestSettings(getTestSettings());
addTestInformation();
Thread t = new Thread(this);
t.setName("Memory monitoring tool");
isRunning.set(true);
t.start();
return t;
}
public void addTestInformation() {
reportGenerator.setReportName(this.getTestSettings().getProperty("report_name"));
reportGenerator.setReportDirectory(this.getTestSettings().getProperty("report_directory"));
reportGenerator.startGenerateReport();
reportGenerator.addTestInformation();
reportGenerator.writeWithIndent(4, "<jvm_memory_settings>");
reportGenerator.writeWithIndent(6, "<heap_memory>");
reportGenerator.writeWithIndent(8, "<committed>" + memoryBean.getHeapMemoryUsage().getCommitted() + "</committed>");
reportGenerator.writeWithIndent(8, "<max>" + memoryBean.getHeapMemoryUsage().getMax() + "</max>");
reportGenerator.writeWithIndent(6, "</heap_memory>");
reportGenerator.writeWithIndent(6, "<non_heap_memory>");
reportGenerator.writeWithIndent(8, "<committed>" + memoryBean.getNonHeapMemoryUsage().getCommitted() + "</committed>");
reportGenerator.writeWithIndent(8, "<max>" + memoryBean.getNonHeapMemoryUsage().getMax() + "</max>");
reportGenerator.writeWithIndent(6, "</non_heap_memory>");
reportGenerator.writeWithIndent(4, "</jvm_memory_settings>");
reportGenerator.addClientSettings();
reportGenerator.endTestInformation();
}
public void run() {
long nonHeapMB = 0;
long heapMB = 0;
long oneMB = 1024 * 1024;
reportGenerator.startTestResult(getCheckpointInterval());
while (isRunning.get()) {
try {
//wait every check point before getting the next memory usage
Thread.sleep(checkpointInterval);
nonHeapMB = memoryBean.getNonHeapMemoryUsage().getUsed() / oneMB;
heapMB = memoryBean.getHeapMemoryUsage().getUsed() / oneMB;
reportGenerator.writeWithIndent(6, "<memory_usage index=" + resultIndex + " non_heap_mb=" + nonHeapMB + " non_heap_bytes=" + memoryBean.getNonHeapMemoryUsage().getUsed() + " heap_mb=" + heapMB + " heap_bytes=" + memoryBean.getHeapMemoryUsage().getUsed() + "/>");
resultIndex++;
} catch (Exception e) {
e.printStackTrace();
}
}
reportGenerator.endTestResult();
reportGenerator.stopGenerateReport();
}
}

View File

@ -0,0 +1,159 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Enumeration;
import java.util.Properties;
public class ReportGenerator {
private static final Log log = LogFactory.getLog(ReportGenerator.class);
private String reportDirectory = null;
private String reportName = null;
private PrintWriter writer = null;
private File reportFile = null;
private Properties testSettings;
public ReportGenerator() {
}
public ReportGenerator(String reportDirectory, String reportName) {
this.setReportDirectory(reportDirectory);
this.setReportName(reportName);
}
public void startGenerateReport() {
File reportDir = new File(getReportDirectory());
// Create output directory if it doesn't exist.
if (!reportDir.exists()) {
reportDir.mkdirs();
}
if (reportDir != null) {
reportFile = new File(this.getReportDirectory() + File.separator + this.getReportName() + ".xml");
}
try {
this.writer = new PrintWriter(new FileOutputStream(reportFile));
} catch (IOException e1) {
e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
public void stopGenerateReport() {
writeWithIndent(0, "</test-report>");
this.getWriter().flush();
this.getWriter().close();
log.info(" TEST REPORT OUTPUT : " + reportFile.getAbsolutePath());
}
protected void addTestInformation() {
writeWithIndent(0, "<test-report>");
writeWithIndent(2, "<test-information>");
writeWithIndent(4, "<os-name>" + System.getProperty("os.name") + "</os-name>");
writeWithIndent(4, "<java-version>" + System.getProperty("java.version") + "</java-version>");
}
protected void addClientSettings() {
if (this.getTestSettings() != null) {
Enumeration keys = getTestSettings().propertyNames();
writeWithIndent(4, "<test-settings>");
String key;
while (keys.hasMoreElements()) {
key = (String) keys.nextElement();
writeWithIndent(6, "<" + key + ">" + getTestSettings().get(key) + "</" + key + ">");
}
writeWithIndent(4, "</test-settings>");
}
}
protected void endTestInformation() {
writeWithIndent(2, "</test-information>");
}
protected void startTestResult(long checkpointInterval) {
long intervalInSec = checkpointInterval / 1000;
writeWithIndent(2, "<test-result checkpoint_interval_in_sec=" + intervalInSec + " >");
}
protected void endTestResult() {
writeWithIndent(2, "</test-result>");
}
protected void writeWithIndent(int indent, String result) {
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < indent; ++i) {
buffer.append(" ");
}
buffer.append(result);
writer.println(buffer.toString());
}
public PrintWriter getWriter() {
return this.writer;
}
public String getReportDirectory() {
return reportDirectory;
}
public void setReportDirectory(String reportDirectory) {
this.reportDirectory = reportDirectory;
}
public String getReportName() {
return reportName;
}
public void setReportName(String reportName) {
this.reportName = reportName;
}
public Properties getTestSettings() {
return testSettings;
}
public void setTestSettings(Properties testSettings) {
this.testSettings = testSettings;
}
}