git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@828954 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-10-23 08:11:17 +00:00
parent 592e18ea9b
commit 026c120094
2 changed files with 111 additions and 0 deletions

View File

@ -481,6 +481,8 @@
<!-- https://issues.apache.org/activemq/browse/AMQ-2050 -->
<exclude>**/ProxyConnectorTest.*</exclude>
<!-- test used only to simulate load on Stomp connector -->
<exclude>**/StompLoadTest.*</exclude>
</excludes>
</configuration>

View File

@ -0,0 +1,109 @@
package org.apache.activemq.transport.stomp;
import java.net.Socket;
import java.net.URI;
import java.util.HashMap;
import junit.framework.TestCase;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
*
* Simulates load on the Stomp connector. All producers/consumers open/close a
* connection on every command Configurable number of producers/consumers, their
* speed and duration of test
*
* Start a broker with the desired configuration to test and then run this test
*
*/
public class StompLoadTest extends TestCase {
private static final Log LOG = LogFactory.getLog(StompLoadTest.class);
final int producerSleep = 10;
final int consumerSleep = 10;
final int msgCount = 10000;
final int producerCount = 5;
final int consumerCount = 5;
final int testTime = 10 * 60 * 1000;
final String bindAddress = "stomp://0.0.0.0:61613";
public void testLoad() throws Exception {
for (int i = 0; i < producerCount; i++) {
ProducerThread producerThread = new ProducerThread("producer" + i);
producerThread.start();
}
for (int i = 0; i < consumerCount; i++) {
Thread consumerThread = new ConsumerThread("consumer" + i);
consumerThread.start();
}
Thread.sleep(testTime);
}
public StompConnection createConnection() throws Exception {
StompConnection conn = new StompConnection();
URI connectUri = new URI(bindAddress);
conn.open(new Socket(connectUri.getHost(), connectUri.getPort()));
conn.connect("", "");
return conn;
}
class ProducerThread extends Thread {
String name;
public ProducerThread(String name) {
this.name = name;
}
public void run() {
for (int i = 0; i < msgCount; i++) {
try {
StompConnection conn = createConnection();
String msg = "test message " + i;
LOG.info(name + " sending " + msg);
conn.send("/queue/test", msg);
conn.disconnect();
Thread.sleep(producerSleep);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
class ConsumerThread extends Thread {
String name;
public ConsumerThread(String name) {
this.name = name;
}
public void run() {
for (int i = 0; i < msgCount; i++) {
try {
StompConnection conn = createConnection();
HashMap<String, String> headers = new HashMap<String, String>();
headers.put("activemq.prefetchSize", "1");
conn.subscribe("/queue/test", "client", headers);
StompFrame frame = conn.receive(1000);
conn.ack(frame);
LOG.info(name + " received " + frame.getBody());
conn.disconnect();
Thread.sleep(consumerSleep);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}