fixes for load testing

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@609606 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-01-07 13:51:06 +00:00
parent 43d58b57b7
commit 3d10accb46
6 changed files with 157 additions and 197 deletions

View File

@ -176,6 +176,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message
public synchronized void gc() {
for (Message msg : batchList) {
rollback(msg.getMessageId());
msg.decrementReferenceCount();
}
cacheEnabled=false;

View File

@ -231,6 +231,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
public synchronized void gc() {
for (Message msg : batchList.values()) {
rollback(msg.getMessageId());
msg.decrementReferenceCount();
}
batchList.clear();

View File

@ -27,9 +27,13 @@ import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.net.ServerSocketFactory;
import org.apache.activemq.ThreadPriorities;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.Transport;
@ -57,6 +61,7 @@ public class TcpTransportServer extends TransportServerThreadSupport {
protected final TcpTransportFactory transportFactory;
protected long maxInactivityDuration = 30000;
protected int minmumWireFormatVersion;
/**
* trace=true -> the Transport stack where this TcpTransport
* object will be, will have a TransportLogger layer
@ -83,11 +88,14 @@ public class TcpTransportServer extends TransportServerThreadSupport {
protected boolean startLogging = true;
protected Map<String, Object> transportOptions;
protected final ServerSocketFactory serverSocketFactory;
protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
protected Thread socketHandlerThread;
public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
super(location);
this.transportFactory = transportFactory;
this.serverSocketFactory = serverSocketFactory;
}
public void bind() throws IOException {
@ -199,18 +207,7 @@ public class TcpTransportServer extends TransportServerThreadSupport {
if (isStopped() || getAcceptListener() == null) {
socket.close();
} else {
HashMap<String, Object> options = new HashMap<String, Object>();
options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
options.put("trace", Boolean.valueOf(trace));
options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
options.put("startLogging", Boolean.valueOf(startLogging));
options.putAll(transportOptions);
WireFormat format = wireFormatFactory.createWireFormat();
Transport transport = createTransport(socket, format);
Transport configuredTransport = transportFactory.serverConfigure(transport, format, options);
getAcceptListener().onAccept(configuredTransport);
socketQueue.put(socket);
}
}
} catch (SocketTimeoutException ste) {
@ -259,6 +256,36 @@ public class TcpTransportServer extends TransportServerThreadSupport {
}
return result;
}
protected void doStart() throws Exception {
Runnable run = new Runnable() {
public void run() {
try {
while (!isStopped() && !isStopping()) {
Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
if (sock != null) {
handleSocket(sock);
}
}
} catch (InterruptedException e) {
LOG.info("socketQueue interuppted - stopping");
if (!isStopping()) {
onAcceptError(e);
}
}
}
};
socketHandlerThread = new Thread(null, run,
"ActiveMQ Transport Server Thread Handler: " + toString(),
getStackSize());
socketHandlerThread.setDaemon(true);
socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
super.doStart();
socketHandlerThread.start();
}
protected void doStop(ServiceStopper stopper) throws Exception {
super.doStop(stopper);
@ -274,4 +301,37 @@ public class TcpTransportServer extends TransportServerThreadSupport {
public void setTransportOption(Map<String, Object> transportOptions) {
this.transportOptions = transportOptions;
}
}
protected void handleSocket(Socket socket) {
try {
HashMap<String, Object> options = new HashMap<String, Object>();
options.put("maxInactivityDuration", Long
.valueOf(maxInactivityDuration));
options.put("minmumWireFormatVersion", Integer
.valueOf(minmumWireFormatVersion));
options.put("trace", Boolean.valueOf(trace));
options
.put("dynamicManagement", Boolean
.valueOf(dynamicManagement));
options.put("startLogging", Boolean.valueOf(startLogging));
options.putAll(transportOptions);
WireFormat format = wireFormatFactory.createWireFormat();
Transport transport = createTransport(socket, format);
Transport configuredTransport = transportFactory.serverConfigure(
transport, format, options);
getAcceptListener().onAccept(configuredTransport);
} catch (SocketTimeoutException ste) {
// expect this to happen
} catch (Exception e) {
if (!isStopping()) {
onAcceptError(e);
} else if (!isStopped()) {
LOG.warn("run()", e);
onAcceptError(e);
}
}
}
}

View File

@ -16,9 +16,6 @@
*/
package org.apache.activemq.load;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
@ -29,12 +26,17 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.perf.PerfRate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.3 $
*/
public class LoadClient implements Runnable{
private static final Log LOG = LogFactory.getLog(LoadClient.class);
protected static int SLEEP_TIME = 2;
protected String name;
protected ConnectionFactory factory;
protected Connection connection;
@ -45,9 +47,10 @@ public class LoadClient implements Runnable{
protected MessageProducer producer;
protected PerfRate rate = new PerfRate();
protected int deliveryMode = DeliveryMode.PERSISTENT;
private boolean connectionPerMessage = false;
private boolean running;
private int timeout = 10000;
protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
protected boolean connectionPerMessage = false;
protected boolean running;
protected int timeout = 10000;
public LoadClient(String name,ConnectionFactory factory) {
@ -65,8 +68,8 @@ public class LoadClient implements Runnable{
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(this.startDestination);
producer = session.createProducer(this.nextDestination);
consumer = session.createConsumer(getConsumeDestination());
producer = session.createProducer(getSendDestination());
producer.setDeliveryMode(this.deliveryMode);
}
@ -79,7 +82,9 @@ public class LoadClient implements Runnable{
public void stop() throws JMSException, InterruptedException {
running = false;
connection.stop();
if(connection != null) {
connection.stop();
}
}
@ -87,34 +92,46 @@ public class LoadClient implements Runnable{
try {
while (running) {
String result = consume();
if (result == null && running) {
throw new Exception(name + "Failed to consume ");
if(result != null) {
send(result);
rate.increment();
}
else if (running) {
LOG.error(name + " Failed to consume!");
}
send(result);
rate.increment();
}
} catch (Throwable e) {
e.printStackTrace();
}
}
protected String consume() throws JMSException {
protected String consume() throws Exception {
Connection con = null;
MessageConsumer c = consumer;
if (connectionPerMessage){
con = factory.createConnection();
con.start();
Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
c = s.createConsumer(startDestination);
c = s.createConsumer(getConsumeDestination());
}
TextMessage result = (TextMessage) c.receive(timeout);
if (connectionPerMessage) {
con.close();
if (result != null) {
if (audit.isDuplicate(result.getJMSMessageID())) {
throw new JMSException("Received duplicate " + result.getText());
}
if (!audit.isInOrder(result.getJMSMessageID())) {
throw new JMSException("Out of order " + result.getText());
}
if (connectionPerMessage) {
Thread.sleep(SLEEP_TIME);//give the broker a chance
con.close();
}
}
return result != null ? result.getText() : null;
}
protected void send(String text) throws JMSException {
protected void send(String text) throws Exception {
Connection con = connection;
MessageProducer p = producer;
Session s = session;
@ -122,13 +139,13 @@ public class LoadClient implements Runnable{
con = factory.createConnection();
con.start();
s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
p = s.createProducer(nextDestination);
p = s.createProducer(getSendDestination());
p.setDeliveryMode(deliveryMode);
}
TextMessage message = s.createTextMessage(text);
p.send(message);
//System.out.println(name + " SENT " + text + " TO " + nextDestination);
if (connectionPerMessage) {
Thread.sleep(SLEEP_TIME);//give the broker a chance
con.close();
}
}
@ -204,5 +221,13 @@ public class LoadClient implements Runnable{
public void setTimeout(int timeout) {
this.timeout = timeout;
}
protected Destination getSendDestination() {
return nextDestination;
}
protected Destination getConsumeDestination() {
return startDestination;
}
}

View File

@ -19,157 +19,59 @@ package org.apache.activemq.load;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.perf.PerfRate;
/**
* @version $Revision: 1.3 $
*/
public class LoadController implements Runnable{
protected ConnectionFactory factory;
protected Connection connection;
protected Destination startDestination;
protected Destination controlDestination;
protected Session session;
protected MessageConsumer consumer;
protected MessageProducer producer;
protected PerfRate rate = new PerfRate();
protected int numberOfBatches = 1;
protected int batchSize = 1000;
protected int deliveryMode = DeliveryMode.PERSISTENT;
private boolean connectionPerMessage = false;
private int timeout = 5000;
private boolean running = false;
public class LoadController extends LoadClient{
private int numberOfBatches=1;
private int batchSize =1000;
private int count;
private final CountDownLatch stopped = new CountDownLatch(1);
public LoadController(ConnectionFactory factory) {
this.factory = factory;
public LoadController(String name,ConnectionFactory factory) {
super(name,factory);
}
public synchronized void start() throws JMSException {
if (!running) {
rate.reset();
running = true;
if (!connectionPerMessage) {
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(this.controlDestination);
producer = session.createProducer(this.startDestination);
producer.setDeliveryMode(this.deliveryMode);
}
Thread t = new Thread(this);
t.setName("LoadController");
t.start();
}
public int awaitTestComplete() throws InterruptedException {
boolean complete = stopped.await(60*5,TimeUnit.SECONDS);
return count;
}
public void stop() throws JMSException, InterruptedException {
running = false;
stopped.await();
//stopped.await(1,TimeUnit.SECONDS);
connection.stop();
stopped.countDown();
if (connection != null) {
this.connection.stop();
}
}
public void run() {
try {
for (int i = 0; i < numberOfBatches; i++) {
for (int j = 0; j < batchSize; j++) {
String payLoad = "batch[" + i + "]no:" + j;
send(payLoad);
}
for (int j = 0; j < batchSize; j++) {
String result = consume();
if (result == null || !result.equals(payLoad)) {
throw new Exception("Failed to consume " + payLoad
+ " GOT " + result);
}
System.out.println("Control got " + result);
if (result != null) {
count++;
rate.increment();
}
}
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
stopped.countDown();
}
}
protected String consume() throws JMSException {
Connection con = null;
MessageConsumer c = consumer;
if (connectionPerMessage){
con = factory.createConnection();
con.start();
Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
c = s.createConsumer(controlDestination);
}
TextMessage result = (TextMessage) c.receive(timeout);
if (connectionPerMessage) {
con.close();
}
return result != null ? result.getText() : null;
}
protected void send(String text) throws JMSException {
Connection con = null;
MessageProducer p = producer;
Session s = session;
if (connectionPerMessage){
con = factory.createConnection();
con.start();
s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
p = s.createProducer(startDestination);
p.setDeliveryMode(deliveryMode);
}
TextMessage message = s.createTextMessage(text);
p.send(message);
if (connectionPerMessage) {
con.close();
}
}
public Destination getStartDestination() {
return startDestination;
}
public void setStartDestination(Destination startDestination) {
this.startDestination = startDestination;
}
public Destination getControlDestination() {
return controlDestination;
}
public void setControlDestination(Destination controlDestination) {
this.controlDestination = controlDestination;
}
public int getNumberOfBatches() {
@ -177,57 +79,27 @@ public class LoadController implements Runnable{
}
public void setNumberOfBatches(int numberOfBatches) {
this.numberOfBatches = numberOfBatches;
}
public int getBatchSize() {
return batchSize;
}
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
public int getDeliveryMode() {
return deliveryMode;
protected Destination getSendDestination() {
return startDestination;
}
public void setDeliveryMode(int deliveryMode) {
this.deliveryMode = deliveryMode;
protected Destination getConsumeDestination() {
return nextDestination;
}
public boolean isConnectionPerMessage() {
return connectionPerMessage;
}
public void setConnectionPerMessage(boolean connectionPerMessage) {
this.connectionPerMessage = connectionPerMessage;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
}

View File

@ -42,12 +42,12 @@ public class LoadTest extends TestCase {
protected LoadClient[] clients;
protected ConnectionFactory factory;
protected Destination destination;
protected int numberOfClients = 10;
protected int numberOfClients = 50;
protected int deliveryMode = DeliveryMode.PERSISTENT;
protected int batchSize = 1000;
protected int numberOfBatches = 4;
protected int numberOfBatches = 10;
protected int timeout = Integer.MAX_VALUE;
protected boolean connectionPerMessage = true;
protected boolean connectionPerMessage = false;
protected Connection managementConnection;
protected Session managementSession;
@ -66,14 +66,15 @@ public class LoadTest extends TestCase {
Destination startDestination = createDestination(managementSession, getClass()+".start");
Destination endDestination = createDestination(managementSession, getClass()+".end");
LOG.info("Running with " + numberOfClients + " clients");
controller = new LoadController(factory);
LOG.info("Running with " + numberOfClients + " clients - sending "
+ numberOfBatches + " batches of " + batchSize + " messages");
controller = new LoadController("Controller",factory);
controller.setBatchSize(batchSize);
controller.setNumberOfBatches(numberOfBatches);
controller.setDeliveryMode(deliveryMode);
controller.setConnectionPerMessage(connectionPerMessage);
controller.setStartDestination(startDestination);
controller.setControlDestination(endDestination);
controller.setNextDestination(endDestination);
controller.setTimeout(timeout);
clients = new LoadClient[numberOfClients];
for (int i = 0; i < numberOfClients; i++) {
@ -147,7 +148,7 @@ public class LoadTest extends TestCase {
clients[i].start();
}
controller.start();
controller.stop();
assertEquals((batchSize* numberOfBatches),controller.awaitTestComplete());
}