further tests for AMQ-2149

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@756469 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-03-20 14:06:45 +00:00
parent 08aecbee6e
commit 86ec0553c5
1 changed files with 163 additions and 26 deletions

View File

@ -17,6 +17,8 @@
package org.apache.activemq.bugs; package org.apache.activemq.bugs;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Vector; import java.util.Vector;
import junit.framework.TestCase; import junit.framework.TestCase;
@ -33,45 +35,54 @@ import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
interface Configurer {
public void configure(BrokerService broker) throws Exception;
}
public class AMQ2149Test extends TestCase { public class AMQ2149Test extends TestCase {
private static final Log log = LogFactory.getLog(AMQ2149Test.class); private static final Log LOG = LogFactory.getLog(AMQ2149Test.class);
private String BROKER_URL; private static final long BROKER_STOP_PERIOD = 15 * 1000;
private static final String BROKER_CONNECTOR = "tcp://localhost:61617";
private static final String BROKER_URL = "failover:("+ BROKER_CONNECTOR
+")?maxReconnectDelay=1000&useExponentialBackOff=false";
private final String SEQ_NUM_PROPERTY = "seqNum"; private final String SEQ_NUM_PROPERTY = "seqNum";
final int MESSAGE_LENGTH_BYTES = 75000; final int MESSAGE_LENGTH_BYTES = 75000;
final int MAX_TO_SEND = 2000; final int MAX_TO_SEND = 2000;
final long SLEEP_BETWEEN_SEND_MS = 5; final long SLEEP_BETWEEN_SEND_MS = 5;
final int NUM_SENDERS_AND_RECEIVERS = 10; final int NUM_SENDERS_AND_RECEIVERS = 10;
final Object brokerLock = new Object();
BrokerService broker; BrokerService broker;
Vector<Throwable> exceptions = new Vector<Throwable>(); Vector<Throwable> exceptions = new Vector<Throwable>();
public void setUp() throws Exception { public void createBroker(Configurer configurer) throws Exception {
broker = new BrokerService(); broker = new BrokerService();
broker.addConnector("tcp://localhost:0"); broker.setDataDirectory("target/amq-data/" + getName());
broker.deleteAllMessages(); broker.addConnector(BROKER_CONNECTOR);
if (configurer != null) {
SystemUsage usage = new SystemUsage(); configurer.configure(broker);
MemoryUsage memoryUsage = new MemoryUsage(); }
memoryUsage.setLimit(2048 * 7 * NUM_SENDERS_AND_RECEIVERS); broker.setBrokerName(getName());
usage.setMemoryUsage(memoryUsage);
broker.setSystemUsage(usage);
broker.start(); broker.start();
BROKER_URL = "failover:("
+ broker.getTransportConnectors().get(0).getUri()
+")?maxReconnectDelay=1000&useExponentialBackOff=false";
} }
public void tearDown() throws Exception { public void tearDown() throws Exception {
broker.stop(); synchronized(brokerLock) {
broker.stop();
broker.waitUntilStopped();
}
exceptions.clear();
} }
private String buildLongString() { private String buildLongString() {
@ -94,6 +105,8 @@ public class AMQ2149Test extends TestCase {
private final MessageConsumer messageConsumer; private final MessageConsumer messageConsumer;
private volatile long nextExpectedSeqNum = 0; private volatile long nextExpectedSeqNum = 0;
private String lastId = null;
public Receiver(String queueName) throws JMSException { public Receiver(String queueName) throws JMSException {
this.queueName = queueName; this.queueName = queueName;
@ -106,21 +119,33 @@ public class AMQ2149Test extends TestCase {
connection.start(); connection.start();
} }
public void close() throws JMSException {
connection.close();
}
public long getNextExpectedSeqNo() {
return nextExpectedSeqNum;
}
public void onMessage(Message message) { public void onMessage(Message message) {
try { try {
final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY); final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
if ((seqNum % 100) == 0) { if ((seqNum % 100) == 0) {
log.info(queueName + " received " + seqNum); LOG.info(queueName + " received " + seqNum);
} }
if (seqNum != nextExpectedSeqNum) { if (seqNum != nextExpectedSeqNum) {
log.warn(queueName + " received " + seqNum + " expected " LOG.warn(queueName + " received " + seqNum
+ nextExpectedSeqNum); + " in msg: " + message.getJMSMessageID()
+ " expected "
+ nextExpectedSeqNum
+ ", lastId: " + lastId);
fail(queueName + " received " + seqNum + " expected " fail(queueName + " received " + seqNum + " expected "
+ nextExpectedSeqNum); + nextExpectedSeqNum);
} }
++nextExpectedSeqNum; ++nextExpectedSeqNum;
lastId = message.getJMSMessageID();
} catch (Throwable e) { } catch (Throwable e) {
log.error(queueName + " onMessage error", e); LOG.error(queueName + " onMessage error", e);
exceptions.add(e); exceptions.add(e);
} }
} }
@ -161,38 +186,150 @@ public class AMQ2149Test extends TestCase {
++nextSequenceNumber; ++nextSequenceNumber;
messageProducer.send(message); messageProducer.send(message);
} catch (Exception e) { } catch (Exception e) {
log.error(queueName + " send error", e); LOG.error(queueName + " send error", e);
exceptions.add(e); exceptions.add(e);
} }
try { try {
Thread.sleep(SLEEP_BETWEEN_SEND_MS); Thread.sleep(SLEEP_BETWEEN_SEND_MS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.warn(queueName + " sleep interrupted", e); LOG.warn(queueName + " sleep interrupted", e);
} }
} }
try {
connection.close();
} catch (JMSException ignored) {
}
} }
} }
public void testOutOfOrderWithMemeUsageLimit() throws Exception { public void testOrderWithMemeUsageLimit() throws Exception {
createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception {
SystemUsage usage = new SystemUsage();
MemoryUsage memoryUsage = new MemoryUsage();
memoryUsage.setLimit(2048 * 7 * NUM_SENDERS_AND_RECEIVERS);
usage.setMemoryUsage(memoryUsage);
broker.setSystemUsage(usage);
broker.deleteAllMessages();
}
});
verifyOrderedMessageReceipt();
}
public void testOrderWithRestartVMIndex() throws Exception {
createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception {
AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
persistenceFactory.setPersistentIndex(false);
broker.setPersistenceFactory(persistenceFactory);
broker.deleteAllMessages();
}
});
final Timer timer = new Timer();
schedualRestartTask(timer, new Configurer() {
public void configure(BrokerService broker) throws Exception {
AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
persistenceFactory.setPersistentIndex(false);
broker.setPersistenceFactory(persistenceFactory);
}
});
try {
verifyOrderedMessageReceipt();
} finally {
timer.cancel();
}
}
public void x_testOrderWithRestartWithForceRecover() throws Exception {
createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception {
AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
persistenceFactory.setForceRecoverReferenceStore(true);
broker.setPersistenceFactory(persistenceFactory);
broker.deleteAllMessages();
}
});
final Timer timer = new Timer();
schedualRestartTask(timer, new Configurer() {
public void configure(BrokerService broker) throws Exception {
AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
persistenceFactory.setForceRecoverReferenceStore(true);
broker.setPersistenceFactory(persistenceFactory);
}
});
try {
verifyOrderedMessageReceipt();
} finally {
timer.cancel();
}
}
private void schedualRestartTask(Timer timer) {
schedualRestartTask(timer, null);
}
private void schedualRestartTask(final Timer timer, final Configurer configurer) {
timer.schedule(new TimerTask() {
public void run() {
synchronized (brokerLock) {
LOG.info("stopping broker..");
try {
broker.stop();
} catch (Exception e) {
LOG.error("ex on broker stop", e);
exceptions.add(e);
}
LOG.info("restarting broker");
try {
createBroker(configurer);
} catch (Exception e) {
LOG.error("ex on broker restart", e);
exceptions.add(e);
}
}
// do once
// timer.cancel();
}
}, BROKER_STOP_PERIOD, BROKER_STOP_PERIOD);
}
private void verifyOrderedMessageReceipt() throws Exception {
Vector<Thread> threads = new Vector<Thread>(); Vector<Thread> threads = new Vector<Thread>();
Vector<Receiver> receivers = new Vector<Receiver>();
for (int i = 0; i < NUM_SENDERS_AND_RECEIVERS; ++i) { for (int i = 0; i < NUM_SENDERS_AND_RECEIVERS; ++i) {
final String queueName = "test.queue." + i; final String queueName = "test.queue." + i;
new Receiver(queueName); receivers.add(new Receiver(queueName));
Thread thread = new Thread(new Sender(queueName)); Thread thread = new Thread(new Sender(queueName));
thread.start(); thread.start();
threads.add(thread); threads.add(thread);
} }
final long expiry = System.currentTimeMillis() + 1000 * 60 * 5; final long expiry = System.currentTimeMillis() + 1000 * 60 * 5;
while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) { while(!threads.isEmpty() && !receivers.isEmpty()
&& exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
Thread sendThread = threads.firstElement(); Thread sendThread = threads.firstElement();
sendThread.join(1000*10); sendThread.join(1000*10);
if (!sendThread.isAlive()) { if (!sendThread.isAlive()) {
threads.remove(sendThread); threads.remove(sendThread);
} }
Receiver receiver = receivers.firstElement();
if (receiver.getNextExpectedSeqNo() >= MAX_TO_SEND) {
receiver.close();
receivers.remove(receiver);
}
} }
assertTrue("No timeout waiting for senders to complete", System.currentTimeMillis() < expiry); assertTrue("No timeout waiting for senders/receivers to complete", System.currentTimeMillis() < expiry);
assertTrue("No exceptions", exceptions.isEmpty()); assertTrue("No exceptions", exceptions.isEmpty());
} }