mirror of https://github.com/apache/activemq.git
Added basic load test for queues
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@608902 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9899dff93d
commit
0640cb703a
|
@ -339,6 +339,9 @@
|
|||
<!-- These are performance tests so take too long to run -->
|
||||
<exclude>**/perf/*</exclude>
|
||||
|
||||
<!-- These are load tests so take too long to run -->
|
||||
<exclude>**/load/*</exclude>
|
||||
|
||||
<!-- http://jira.activemq.org/jira/browse/AMQ-594 -->
|
||||
<exclude>**/SimpleNetworkTest.*</exclude>
|
||||
|
||||
|
|
|
@ -0,0 +1,208 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.load;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.perf.PerfRate;
|
||||
|
||||
/**
|
||||
* @version $Revision: 1.3 $
|
||||
*/
|
||||
public class LoadClient implements Runnable{
|
||||
protected String name;
|
||||
protected ConnectionFactory factory;
|
||||
protected Connection connection;
|
||||
protected Destination startDestination;
|
||||
protected Destination nextDestination;
|
||||
protected Session session;
|
||||
protected MessageConsumer consumer;
|
||||
protected MessageProducer producer;
|
||||
protected PerfRate rate = new PerfRate();
|
||||
protected int deliveryMode = DeliveryMode.PERSISTENT;
|
||||
private boolean connectionPerMessage = false;
|
||||
private boolean running;
|
||||
private int timeout = 10000;
|
||||
|
||||
|
||||
public LoadClient(String name,ConnectionFactory factory) {
|
||||
this.name=name;
|
||||
this.factory = factory;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public synchronized void start() throws JMSException {
|
||||
if (!running) {
|
||||
rate.reset();
|
||||
running = true;
|
||||
if (!connectionPerMessage) {
|
||||
connection = factory.createConnection();
|
||||
connection.start();
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
consumer = session.createConsumer(this.startDestination);
|
||||
producer = session.createProducer(this.nextDestination);
|
||||
producer.setDeliveryMode(this.deliveryMode);
|
||||
|
||||
}
|
||||
|
||||
Thread t = new Thread(this);
|
||||
t.setName(name);
|
||||
t.start();
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() throws JMSException, InterruptedException {
|
||||
running = false;
|
||||
connection.stop();
|
||||
}
|
||||
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
while (running) {
|
||||
String result = consume();
|
||||
if (result == null && running) {
|
||||
throw new Exception(name + "Failed to consume ");
|
||||
}
|
||||
send(result);
|
||||
rate.increment();
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
protected String consume() throws JMSException {
|
||||
Connection con = null;
|
||||
MessageConsumer c = consumer;
|
||||
if (connectionPerMessage){
|
||||
con = factory.createConnection();
|
||||
con.start();
|
||||
Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
c = s.createConsumer(startDestination);
|
||||
}
|
||||
TextMessage result = (TextMessage) c.receive(timeout);
|
||||
if (connectionPerMessage) {
|
||||
con.close();
|
||||
}
|
||||
return result != null ? result.getText() : null;
|
||||
}
|
||||
|
||||
protected void send(String text) throws JMSException {
|
||||
Connection con = connection;
|
||||
MessageProducer p = producer;
|
||||
Session s = session;
|
||||
if (connectionPerMessage){
|
||||
con = factory.createConnection();
|
||||
con.start();
|
||||
s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
p = s.createProducer(nextDestination);
|
||||
p.setDeliveryMode(deliveryMode);
|
||||
}
|
||||
TextMessage message = s.createTextMessage(text);
|
||||
p.send(message);
|
||||
//System.out.println(name + " SENT " + text + " TO " + nextDestination);
|
||||
if (connectionPerMessage) {
|
||||
con.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public Destination getStartDestination() {
|
||||
return startDestination;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void setStartDestination(Destination startDestination) {
|
||||
this.startDestination = startDestination;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public Destination getNextDestination() {
|
||||
return nextDestination;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void setNextDestination(Destination nextDestination) {
|
||||
this.nextDestination = nextDestination;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public int getDeliveryMode() {
|
||||
return deliveryMode;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void setDeliveryMode(int deliveryMode) {
|
||||
this.deliveryMode = deliveryMode;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public boolean isConnectionPerMessage() {
|
||||
return connectionPerMessage;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void setConnectionPerMessage(boolean connectionPerMessage) {
|
||||
this.connectionPerMessage = connectionPerMessage;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public int getTimeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void setTimeout(int timeout) {
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,233 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.load;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.perf.PerfRate;
|
||||
|
||||
/**
|
||||
* @version $Revision: 1.3 $
|
||||
*/
|
||||
public class LoadController implements Runnable{
|
||||
protected ConnectionFactory factory;
|
||||
protected Connection connection;
|
||||
protected Destination startDestination;
|
||||
protected Destination controlDestination;
|
||||
protected Session session;
|
||||
protected MessageConsumer consumer;
|
||||
protected MessageProducer producer;
|
||||
protected PerfRate rate = new PerfRate();
|
||||
protected int numberOfBatches = 1;
|
||||
protected int batchSize = 1000;
|
||||
protected int deliveryMode = DeliveryMode.PERSISTENT;
|
||||
private boolean connectionPerMessage = false;
|
||||
private int timeout = 5000;
|
||||
private boolean running = false;
|
||||
private final CountDownLatch stopped = new CountDownLatch(1);
|
||||
|
||||
|
||||
public LoadController(ConnectionFactory factory) {
|
||||
this.factory = factory;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public synchronized void start() throws JMSException {
|
||||
if (!running) {
|
||||
rate.reset();
|
||||
running = true;
|
||||
if (!connectionPerMessage) {
|
||||
connection = factory.createConnection();
|
||||
connection.start();
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
consumer = session.createConsumer(this.controlDestination);
|
||||
producer = session.createProducer(this.startDestination);
|
||||
producer.setDeliveryMode(this.deliveryMode);
|
||||
|
||||
}
|
||||
|
||||
Thread t = new Thread(this);
|
||||
t.setName("LoadController");
|
||||
t.start();
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() throws JMSException, InterruptedException {
|
||||
running = false;
|
||||
stopped.await();
|
||||
//stopped.await(1,TimeUnit.SECONDS);
|
||||
connection.stop();
|
||||
}
|
||||
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
|
||||
for (int i = 0; i < numberOfBatches; i++) {
|
||||
for (int j = 0; j < batchSize; j++) {
|
||||
String payLoad = "batch[" + i + "]no:" + j;
|
||||
send(payLoad);
|
||||
String result = consume();
|
||||
if (result == null || !result.equals(payLoad)) {
|
||||
throw new Exception("Failed to consume " + payLoad
|
||||
+ " GOT " + result);
|
||||
}
|
||||
System.out.println("Control got " + result);
|
||||
rate.increment();
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
stopped.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
protected String consume() throws JMSException {
|
||||
Connection con = null;
|
||||
MessageConsumer c = consumer;
|
||||
if (connectionPerMessage){
|
||||
con = factory.createConnection();
|
||||
con.start();
|
||||
Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
c = s.createConsumer(controlDestination);
|
||||
}
|
||||
TextMessage result = (TextMessage) c.receive(timeout);
|
||||
if (connectionPerMessage) {
|
||||
con.close();
|
||||
}
|
||||
return result != null ? result.getText() : null;
|
||||
}
|
||||
|
||||
protected void send(String text) throws JMSException {
|
||||
Connection con = null;
|
||||
MessageProducer p = producer;
|
||||
Session s = session;
|
||||
if (connectionPerMessage){
|
||||
con = factory.createConnection();
|
||||
con.start();
|
||||
s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
p = s.createProducer(startDestination);
|
||||
p.setDeliveryMode(deliveryMode);
|
||||
}
|
||||
TextMessage message = s.createTextMessage(text);
|
||||
p.send(message);
|
||||
if (connectionPerMessage) {
|
||||
con.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
public Destination getStartDestination() {
|
||||
return startDestination;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void setStartDestination(Destination startDestination) {
|
||||
this.startDestination = startDestination;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public Destination getControlDestination() {
|
||||
return controlDestination;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void setControlDestination(Destination controlDestination) {
|
||||
this.controlDestination = controlDestination;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public int getNumberOfBatches() {
|
||||
return numberOfBatches;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void setNumberOfBatches(int numberOfBatches) {
|
||||
this.numberOfBatches = numberOfBatches;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public int getBatchSize() {
|
||||
return batchSize;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void setBatchSize(int batchSize) {
|
||||
this.batchSize = batchSize;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public int getDeliveryMode() {
|
||||
return deliveryMode;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void setDeliveryMode(int deliveryMode) {
|
||||
this.deliveryMode = deliveryMode;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public boolean isConnectionPerMessage() {
|
||||
return connectionPerMessage;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void setConnectionPerMessage(boolean connectionPerMessage) {
|
||||
this.connectionPerMessage = connectionPerMessage;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public int getTimeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void setTimeout(int timeout) {
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,154 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.load;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Session;
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* @version $Revision: 1.3 $
|
||||
*/
|
||||
public class LoadTest extends TestCase {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(LoadTest.class);
|
||||
|
||||
protected BrokerService broker;
|
||||
protected String bindAddress="tcp://localhost:61616";
|
||||
|
||||
protected LoadController controller;
|
||||
protected LoadClient[] clients;
|
||||
protected ConnectionFactory factory;
|
||||
protected Destination destination;
|
||||
protected int numberOfClients = 10;
|
||||
protected int deliveryMode = DeliveryMode.PERSISTENT;
|
||||
protected int batchSize = 1000;
|
||||
protected int numberOfBatches = 4;
|
||||
protected int timeout = Integer.MAX_VALUE;
|
||||
protected boolean connectionPerMessage = true;
|
||||
protected Connection managementConnection;
|
||||
protected Session managementSession;
|
||||
|
||||
/**
|
||||
* Sets up a test where the producer and consumer have their own connection.
|
||||
*
|
||||
* @see junit.framework.TestCase#setUp()
|
||||
*/
|
||||
protected void setUp() throws Exception {
|
||||
if (broker == null) {
|
||||
broker = createBroker(bindAddress);
|
||||
}
|
||||
factory = createConnectionFactory(bindAddress);
|
||||
managementConnection = factory.createConnection();
|
||||
managementSession = managementConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
Destination startDestination = createDestination(managementSession, getClass()+".start");
|
||||
Destination endDestination = createDestination(managementSession, getClass()+".end");
|
||||
LOG.info("Running with " + numberOfClients + " clients");
|
||||
controller = new LoadController(factory);
|
||||
controller.setBatchSize(batchSize);
|
||||
controller.setNumberOfBatches(numberOfBatches);
|
||||
controller.setDeliveryMode(deliveryMode);
|
||||
controller.setConnectionPerMessage(connectionPerMessage);
|
||||
controller.setStartDestination(startDestination);
|
||||
controller.setControlDestination(endDestination);
|
||||
controller.setTimeout(timeout);
|
||||
clients = new LoadClient[numberOfClients];
|
||||
for (int i = 0; i < numberOfClients; i++) {
|
||||
Destination inDestination = null;
|
||||
if (i==0) {
|
||||
inDestination = startDestination;
|
||||
}else {
|
||||
inDestination = createDestination(managementSession, getClass() + ".client."+(i));
|
||||
}
|
||||
Destination outDestination = null;
|
||||
if (i==(numberOfClients-1)) {
|
||||
outDestination = endDestination;
|
||||
}else {
|
||||
outDestination = createDestination(managementSession, getClass() + ".client."+(i+1));
|
||||
}
|
||||
LoadClient client = new LoadClient("client("+i+")",factory);
|
||||
client.setTimeout(timeout);
|
||||
client.setDeliveryMode(deliveryMode);
|
||||
client.setConnectionPerMessage(connectionPerMessage);
|
||||
client.setStartDestination(inDestination);
|
||||
client.setNextDestination(outDestination);
|
||||
clients[i] = client;
|
||||
}
|
||||
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
managementConnection.close();
|
||||
for (int i = 0; i < numberOfClients; i++) {
|
||||
clients[i].stop();
|
||||
}
|
||||
controller.stop();
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
broker = null;
|
||||
}
|
||||
}
|
||||
|
||||
protected Destination createDestination(Session s, String destinationName) throws JMSException {
|
||||
return s.createQueue(destinationName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method to create a new broker
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
protected BrokerService createBroker(String uri) throws Exception {
|
||||
BrokerService answer = new BrokerService();
|
||||
configureBroker(answer,uri);
|
||||
answer.start();
|
||||
return answer;
|
||||
}
|
||||
|
||||
|
||||
|
||||
protected void configureBroker(BrokerService answer,String uri) throws Exception {
|
||||
answer.setDeleteAllMessagesOnStartup(true);
|
||||
answer.addConnector(uri);
|
||||
answer.setUseShutdownHook(false);
|
||||
}
|
||||
|
||||
protected ActiveMQConnectionFactory createConnectionFactory(String uri) throws Exception {
|
||||
return new ActiveMQConnectionFactory(uri);
|
||||
}
|
||||
|
||||
public void testLoad() throws JMSException, InterruptedException {
|
||||
for (int i = 0; i < numberOfClients; i++) {
|
||||
clients[i].start();
|
||||
}
|
||||
controller.start();
|
||||
controller.stop();
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue