mirror of https://github.com/apache/activemq.git
Adding a test does a load test across multiple network connections.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@631658 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
00893abe27
commit
a9120b1b12
|
@ -0,0 +1,305 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.network;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.ActiveMQPrefetchPolicy;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
|
||||
import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||
import org.apache.activemq.usage.SystemUsage;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* This test case is used to load test store and forwarding between brokers. It sets up
|
||||
* 10 brokers to which have a chain of queues which this test consumes and produces to.
|
||||
*
|
||||
* If the network bridges gets stuck at any point subsequent queues will not get messages. This test
|
||||
* samples the production and consumption stats every second and if the flow of messages
|
||||
* get stuck then this tast fails. The test monitors the flow of messages for 1 min.
|
||||
*
|
||||
* @author chirino
|
||||
*/
|
||||
public class NetworkLoadTest extends TestCase {
|
||||
|
||||
private static final transient Log LOG = LogFactory.getLog(NetworkLoadTest.class);
|
||||
|
||||
// How many times do we sample?
|
||||
private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", ""+60*1)); // 1 min since each sample is 1 sec long.
|
||||
// Slower machines might need to make this bigger.
|
||||
private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000 * 1));
|
||||
protected static final int BROKER_COUNT = 10;
|
||||
|
||||
class ForwardingClient {
|
||||
|
||||
private final AtomicLong forwardCounter = new AtomicLong();
|
||||
private final Connection toConnection;
|
||||
private final Connection fromConnection;
|
||||
|
||||
public ForwardingClient(int from, int to) throws JMSException {
|
||||
toConnection = createConnection(to);
|
||||
Session toSession = toConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
final MessageProducer producer = toSession.createProducer(new ActiveMQQueue("Q"+to));
|
||||
|
||||
fromConnection = createConnection(from);
|
||||
Session fromSession = fromConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = fromSession.createConsumer(new ActiveMQQueue("Q"+from));
|
||||
|
||||
|
||||
consumer.setMessageListener(new MessageListener() {
|
||||
public void onMessage(Message msg) {
|
||||
try {
|
||||
producer.send(msg);
|
||||
forwardCounter.incrementAndGet();
|
||||
} catch (JMSException e) {
|
||||
// this is caused by the connection getting closed.
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void start() throws JMSException {
|
||||
toConnection.start();
|
||||
fromConnection.start();
|
||||
}
|
||||
|
||||
public void stop() throws JMSException {
|
||||
fromConnection.stop();
|
||||
toConnection.stop();
|
||||
}
|
||||
|
||||
public void close() throws JMSException {
|
||||
toConnection.close();
|
||||
fromConnection.close();
|
||||
}
|
||||
}
|
||||
|
||||
private BrokerService[] brokers;
|
||||
private ForwardingClient[] forwardingClients;
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
brokers = new BrokerService[BROKER_COUNT];
|
||||
for (int i = 0; i < brokers.length; i++) {
|
||||
LOG.info("Starting broker: "+i);
|
||||
brokers[i] = createBroker(i);
|
||||
brokers[i].start();
|
||||
}
|
||||
// Wait for the brokers to finish starting up and establish thier network connections.
|
||||
Thread.sleep(BROKER_COUNT*400);
|
||||
forwardingClients = new ForwardingClient[BROKER_COUNT-1];
|
||||
for (int i = 0; i < forwardingClients.length; i++) {
|
||||
LOG.info("Starting fowarding client "+i);
|
||||
forwardingClients[i] = new ForwardingClient(i, i+1);
|
||||
forwardingClients[i].start();
|
||||
}
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
for (int i = 0; i < forwardingClients.length; i++) {
|
||||
LOG.info("Stoping fowarding client "+i);
|
||||
forwardingClients[i].close();
|
||||
}
|
||||
for (int i = 0; i < brokers.length; i++) {
|
||||
LOG.info("Stoping broker "+i);
|
||||
brokers[i].stop();
|
||||
}
|
||||
}
|
||||
|
||||
protected Connection createConnection(int brokerId) throws JMSException {
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:"+(60000+brokerId));
|
||||
connectionFactory.setOptimizedMessageDispatch(true);
|
||||
connectionFactory.setCopyMessageOnSend(false);
|
||||
connectionFactory.setUseCompression(false);
|
||||
connectionFactory.setDispatchAsync(true);
|
||||
connectionFactory.setUseAsyncSend(false);
|
||||
connectionFactory.setOptimizeAcknowledge(false);
|
||||
connectionFactory.setWatchTopicAdvisories(false);
|
||||
ActiveMQPrefetchPolicy qPrefetchPolicy= new ActiveMQPrefetchPolicy();
|
||||
qPrefetchPolicy.setQueuePrefetch(100);
|
||||
qPrefetchPolicy.setTopicPrefetch(1000);
|
||||
connectionFactory.setPrefetchPolicy(qPrefetchPolicy);
|
||||
connectionFactory.setAlwaysSyncSend(true);
|
||||
return connectionFactory.createConnection();
|
||||
}
|
||||
|
||||
protected BrokerService createBroker(int brokerId) throws Exception {
|
||||
BrokerService broker = new BrokerService();
|
||||
broker.setBrokerName("broker-" + brokerId);
|
||||
broker.setPersistent(false);
|
||||
broker.setUseJmx(true);
|
||||
broker.getManagementContext().setCreateConnector(false);
|
||||
|
||||
final SystemUsage memoryManager = new SystemUsage();
|
||||
memoryManager.getMemoryUsage().setLimit(1024 * 1024 * 50); // 50 MB
|
||||
broker.setSystemUsage(memoryManager);
|
||||
|
||||
final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
|
||||
final PolicyEntry entry = new PolicyEntry();
|
||||
entry.setQueue(">");
|
||||
entry.setMemoryLimit(1024 * 1024 * 1); // Set to 1 MB
|
||||
entry.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
|
||||
entry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
|
||||
policyEntries.add(entry);
|
||||
|
||||
// This is to turn of the default behavior of storing topic messages for retroactive consumption
|
||||
final PolicyEntry topicPolicyEntry = new PolicyEntry();
|
||||
topicPolicyEntry.setTopic(">");
|
||||
final NoSubscriptionRecoveryPolicy noSubscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
|
||||
topicPolicyEntry.setSubscriptionRecoveryPolicy(noSubscriptionRecoveryPolicy);
|
||||
|
||||
final PolicyMap policyMap = new PolicyMap();
|
||||
policyMap.setPolicyEntries(policyEntries);
|
||||
broker.setDestinationPolicy(policyMap);
|
||||
|
||||
TransportConnector transportConnector = new TransportConnector();
|
||||
transportConnector.setUri(new URI("tcp://localhost:"+(60000+brokerId)));
|
||||
transportConnector.setDiscoveryUri(new URI("multicast://network-load-test"));
|
||||
broker.addConnector(transportConnector);
|
||||
|
||||
DiscoveryNetworkConnector networkConnector = new DiscoveryNetworkConnector();
|
||||
networkConnector.setUri(new URI("multicast://network-load-test"));
|
||||
networkConnector.setBridgeTempDestinations(true);
|
||||
networkConnector.setPrefetchSize(1);
|
||||
broker.addNetworkConnector(networkConnector);
|
||||
|
||||
return broker;
|
||||
}
|
||||
|
||||
public void testRequestReply() throws Exception {
|
||||
|
||||
final int to = 0; // Send to the first broker
|
||||
int from = brokers.length-1; // consume from the last broker..
|
||||
|
||||
LOG.info("Staring Final Consumer");
|
||||
|
||||
Connection fromConnection = createConnection(from);
|
||||
fromConnection.start();
|
||||
Session fromSession = fromConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = fromSession.createConsumer(new ActiveMQQueue("Q"+from));
|
||||
|
||||
final AtomicReference<ActiveMQTextMessage> lastMessageReceived = new AtomicReference<ActiveMQTextMessage>();
|
||||
final AtomicLong producedMessages = new AtomicLong();
|
||||
final AtomicLong receivedMessages = new AtomicLong();
|
||||
final AtomicBoolean done = new AtomicBoolean();
|
||||
|
||||
// Setup the consumer..
|
||||
consumer.setMessageListener(new MessageListener() {
|
||||
public void onMessage(Message msg) {
|
||||
ActiveMQTextMessage m = (ActiveMQTextMessage) msg;
|
||||
ActiveMQTextMessage last = lastMessageReceived.get();
|
||||
if( last!=null ) {
|
||||
// Some order checking...
|
||||
if( last.getMessageId().getProducerSequenceId() > m.getMessageId().getProducerSequenceId() ) {
|
||||
System.out.println("Received an out of order message. Got "+m.getMessageId()+", expected something after "+last.getMessageId());
|
||||
}
|
||||
}
|
||||
lastMessageReceived.set(m);
|
||||
receivedMessages.incrementAndGet();
|
||||
}
|
||||
});
|
||||
|
||||
LOG.info("Staring Initial Producer");
|
||||
final Connection toConnection = createConnection(to);
|
||||
Thread producer = new Thread("Producer") {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
toConnection.start();
|
||||
Session toSession = toConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
final MessageProducer producer = toSession.createProducer(new ActiveMQQueue("Q"+to));
|
||||
|
||||
for (int i = 0; !done.get(); i++) {
|
||||
TextMessage msg = toSession.createTextMessage("test msg: " + i);
|
||||
producer.send(msg);
|
||||
producedMessages.incrementAndGet();
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
producer.start();
|
||||
|
||||
for (int i = 0; i < SAMPLES; i++) {
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
producedMessages.set(0);
|
||||
receivedMessages.set(0);
|
||||
for (int j = 0; j < forwardingClients.length; j++) {
|
||||
forwardingClients[j].forwardCounter.set(0);
|
||||
}
|
||||
|
||||
Thread.sleep(SAMPLE_DURATION);
|
||||
|
||||
long end = System.currentTimeMillis();
|
||||
long r = receivedMessages.get();
|
||||
long p = producedMessages.get();
|
||||
|
||||
LOG.info("published: " + p + " msgs at " + (p * 1000f / (end - start)) + " msgs/sec, " + "consumed: " + r + " msgs at " + (r * 1000f / (end - start)) + " msgs/sec");
|
||||
|
||||
StringBuffer fwdingmsg = new StringBuffer(500);
|
||||
fwdingmsg.append(" forwarding counters: ");
|
||||
for (int j = 0; j < forwardingClients.length; j++) {
|
||||
if( j!= 0 ) {
|
||||
fwdingmsg.append(", ");
|
||||
}
|
||||
fwdingmsg.append(forwardingClients[j].forwardCounter.get());
|
||||
}
|
||||
LOG.info(fwdingmsg);
|
||||
|
||||
// The test is just checking to make sure thaat the producer and consumer does not hang
|
||||
// due to the network hops take to route the message form the producer to the consumer.
|
||||
assertTrue("Recieved some messages since last sample", r>0);
|
||||
assertTrue("Produced some messages since last sample", p>0);
|
||||
|
||||
}
|
||||
LOG.info("Sample done.");
|
||||
done.set(true);
|
||||
// Wait for the producer to finish.
|
||||
producer.join(1000*5);
|
||||
toConnection.close();
|
||||
fromConnection.close();
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
Reference in New Issue