mirror of https://github.com/apache/activemq.git
share the test
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@758200 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
42d91a3b79
commit
fb9a292be1
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.activemq.bugs;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.Vector;
|
||||
|
@ -34,6 +35,9 @@ import javax.jms.Session;
|
|||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
|
||||
import org.apache.activemq.usage.MemoryUsage;
|
||||
|
@ -59,24 +63,34 @@ public class AMQ2149Test extends TestCase {
|
|||
|
||||
final int MESSAGE_LENGTH_BYTES = 75000;
|
||||
final int MAX_TO_SEND = 2000;
|
||||
final long SLEEP_BETWEEN_SEND_MS = 5;
|
||||
final long SLEEP_BETWEEN_SEND_MS = 3;
|
||||
final int NUM_SENDERS_AND_RECEIVERS = 10;
|
||||
final Object brokerLock = new Object();
|
||||
|
||||
BrokerService broker;
|
||||
Vector<Throwable> exceptions = new Vector<Throwable>();
|
||||
|
||||
private File dataDirFile;
|
||||
|
||||
public void createBroker(Configurer configurer) throws Exception {
|
||||
broker = new BrokerService();
|
||||
broker.setDataDirectory("target/amq-data/" + getName());
|
||||
AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
|
||||
persistenceFactory.setDataDirectory(dataDirFile);
|
||||
broker.setPersistenceFactory(persistenceFactory);
|
||||
|
||||
broker.addConnector(BROKER_CONNECTOR);
|
||||
broker.setBrokerName(getName());
|
||||
broker.setDataDirectoryFile(dataDirFile);
|
||||
if (configurer != null) {
|
||||
configurer.configure(broker);
|
||||
}
|
||||
broker.setBrokerName(getName());
|
||||
broker.start();
|
||||
}
|
||||
|
||||
public void setUp() throws Exception {
|
||||
dataDirFile = new File("target/"+ getName());
|
||||
}
|
||||
|
||||
public void tearDown() throws Exception {
|
||||
synchronized(brokerLock) {
|
||||
broker.stop();
|
||||
|
@ -138,7 +152,8 @@ public class AMQ2149Test extends TestCase {
|
|||
+ " in msg: " + message.getJMSMessageID()
|
||||
+ " expected "
|
||||
+ nextExpectedSeqNum
|
||||
+ ", lastId: " + lastId);
|
||||
+ ", lastId: " + lastId
|
||||
+ ", message:" + message);
|
||||
fail(queueName + " received " + seqNum + " expected "
|
||||
+ nextExpectedSeqNum);
|
||||
}
|
||||
|
@ -189,10 +204,12 @@ public class AMQ2149Test extends TestCase {
|
|||
LOG.error(queueName + " send error", e);
|
||||
exceptions.add(e);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(SLEEP_BETWEEN_SEND_MS);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn(queueName + " sleep interrupted", e);
|
||||
if (SLEEP_BETWEEN_SEND_MS > 0) {
|
||||
try {
|
||||
Thread.sleep(SLEEP_BETWEEN_SEND_MS);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn(queueName + " sleep interrupted", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
try {
|
||||
|
@ -202,13 +219,13 @@ public class AMQ2149Test extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testOrderWithMemeUsageLimit() throws Exception {
|
||||
public void x_testOrderWithMemeUsageLimit() throws Exception {
|
||||
|
||||
createBroker(new Configurer() {
|
||||
public void configure(BrokerService broker) throws Exception {
|
||||
SystemUsage usage = new SystemUsage();
|
||||
MemoryUsage memoryUsage = new MemoryUsage();
|
||||
memoryUsage.setLimit(2048 * 7 * NUM_SENDERS_AND_RECEIVERS);
|
||||
memoryUsage.setLimit(MESSAGE_LENGTH_BYTES * 5 * NUM_SENDERS_AND_RECEIVERS);
|
||||
usage.setMemoryUsage(memoryUsage);
|
||||
broker.setSystemUsage(usage);
|
||||
|
||||
|
@ -222,9 +239,9 @@ public class AMQ2149Test extends TestCase {
|
|||
public void testOrderWithRestartVMIndex() throws Exception {
|
||||
createBroker(new Configurer() {
|
||||
public void configure(BrokerService broker) throws Exception {
|
||||
AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
|
||||
AMQPersistenceAdapterFactory persistenceFactory =
|
||||
(AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
|
||||
persistenceFactory.setPersistentIndex(false);
|
||||
broker.setPersistenceFactory(persistenceFactory);
|
||||
broker.deleteAllMessages();
|
||||
}
|
||||
});
|
||||
|
@ -232,9 +249,9 @@ public class AMQ2149Test extends TestCase {
|
|||
final Timer timer = new Timer();
|
||||
schedualRestartTask(timer, new Configurer() {
|
||||
public void configure(BrokerService broker) throws Exception {
|
||||
AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
|
||||
AMQPersistenceAdapterFactory persistenceFactory =
|
||||
(AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
|
||||
persistenceFactory.setPersistentIndex(false);
|
||||
broker.setPersistenceFactory(persistenceFactory);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -245,13 +262,31 @@ public class AMQ2149Test extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public void x_testOrderWithRestart() throws Exception {
|
||||
createBroker(new Configurer() {
|
||||
public void configure(BrokerService broker) throws Exception {
|
||||
broker.deleteAllMessages();
|
||||
}
|
||||
});
|
||||
|
||||
final Timer timer = new Timer();
|
||||
schedualRestartTask(timer, null);
|
||||
|
||||
try {
|
||||
verifyOrderedMessageReceipt();
|
||||
} finally {
|
||||
timer.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void x_testOrderWithRestartWithForceRecover() throws Exception {
|
||||
createBroker(new Configurer() {
|
||||
public void configure(BrokerService broker) throws Exception {
|
||||
AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
|
||||
AMQPersistenceAdapterFactory persistenceFactory =
|
||||
(AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
|
||||
persistenceFactory.setForceRecoverReferenceStore(true);
|
||||
broker.setPersistenceFactory(persistenceFactory);
|
||||
broker.deleteAllMessages();
|
||||
}
|
||||
});
|
||||
|
@ -259,9 +294,14 @@ public class AMQ2149Test extends TestCase {
|
|||
final Timer timer = new Timer();
|
||||
schedualRestartTask(timer, new Configurer() {
|
||||
public void configure(BrokerService broker) throws Exception {
|
||||
AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
|
||||
AMQPersistenceAdapterFactory persistenceFactory =
|
||||
(AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
|
||||
persistenceFactory.setForceRecoverReferenceStore(true);
|
||||
broker.setPersistenceFactory(persistenceFactory);
|
||||
// PolicyEntry auditDepthPolicy = new PolicyEntry();
|
||||
// auditDepthPolicy.setMaxAuditDepth(2000);
|
||||
// PolicyMap policyMap = new PolicyMap();
|
||||
// policyMap.setDefaultEntry(auditDepthPolicy);
|
||||
// broker.setDestinationPolicy(policyMap);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -272,12 +312,8 @@ public class AMQ2149Test extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private void schedualRestartTask(Timer timer) {
|
||||
schedualRestartTask(timer, null);
|
||||
}
|
||||
|
||||
private void schedualRestartTask(final Timer timer, final Configurer configurer) {
|
||||
timer.schedule(new TimerTask() {
|
||||
class RestartTask extends TimerTask {
|
||||
public void run() {
|
||||
synchronized (brokerLock) {
|
||||
LOG.info("stopping broker..");
|
||||
|
@ -290,15 +326,20 @@ public class AMQ2149Test extends TestCase {
|
|||
LOG.info("restarting broker");
|
||||
try {
|
||||
createBroker(configurer);
|
||||
broker.waitUntilStarted();
|
||||
} catch (Exception e) {
|
||||
LOG.error("ex on broker restart", e);
|
||||
exceptions.add(e);
|
||||
}
|
||||
}
|
||||
// do once
|
||||
// timer.cancel();
|
||||
}
|
||||
}, BROKER_STOP_PERIOD, BROKER_STOP_PERIOD);
|
||||
// do it again
|
||||
try {
|
||||
timer.schedule(new RestartTask(), BROKER_STOP_PERIOD);
|
||||
} catch (IllegalStateException ignore_alreadyCancelled) {
|
||||
}
|
||||
}
|
||||
}
|
||||
timer.schedule(new RestartTask(), BROKER_STOP_PERIOD);
|
||||
}
|
||||
|
||||
private void verifyOrderedMessageReceipt() throws Exception {
|
||||
|
@ -314,17 +355,19 @@ public class AMQ2149Test extends TestCase {
|
|||
threads.add(thread);
|
||||
}
|
||||
|
||||
final long expiry = System.currentTimeMillis() + 1000 * 60 * 5;
|
||||
while(!threads.isEmpty() && !receivers.isEmpty()
|
||||
&& exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
|
||||
final long expiry = System.currentTimeMillis() + 1000 * 60 * 10;
|
||||
while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
|
||||
Thread sendThread = threads.firstElement();
|
||||
sendThread.join(1000*10);
|
||||
if (!sendThread.isAlive()) {
|
||||
threads.remove(sendThread);
|
||||
}
|
||||
|
||||
}
|
||||
LOG.info("senders done...");
|
||||
|
||||
while(!receivers.isEmpty() && System.currentTimeMillis() < expiry) {
|
||||
Receiver receiver = receivers.firstElement();
|
||||
if (receiver.getNextExpectedSeqNo() >= MAX_TO_SEND) {
|
||||
if (receiver.getNextExpectedSeqNo() >= MAX_TO_SEND || !exceptions.isEmpty()) {
|
||||
receiver.close();
|
||||
receivers.remove(receiver);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue